package main import ( "encoding/json" "fmt" "io/ioutil" "os" "os/signal" "sort" "sync" "syscall" "time" wr "github.com/mroth/weightedrand" "gopkg.in/yaml.v2" ) const ( cachedShuffle = "cached-shuffle" sticky = "sticky" ) type proxyInst struct { URL string Weight int StatusHistory []bool } type ProxyManager struct { Proxys []proxyInst Cache map[string]int LoadBalanceMode string CacheCleanInterval int DynamicWeightFile string Chooser wr.Chooser enabledIndics []int mux sync.Mutex } type proxyConf struct { Proxys []struct { URL string `yaml:"url"` Weight int `yaml:"weight"` } `yaml:"proxy"` LoadBalanceMode string `yaml:"load-balance-mode"` CacheCleanInterval int `yaml:"cache-clean-interval"` DynamicWeightFile string `yaml:"dynamic-weight-file"` } type dynamicWeight struct { Weights []int `json:"weights"` } func NewPM(confFp string) (*ProxyManager, error) { pm := ProxyManager{} f, err := os.Open(confFp) if err != nil { return &ProxyManager{}, err } defer f.Close() var cfg proxyConf decoder := yaml.NewDecoder(f) err = decoder.Decode(&cfg) if err != nil { return &ProxyManager{}, err } for _, pc := range cfg.Proxys { var pi proxyInst pi.URL = pc.URL pi.Weight = pc.Weight pm.Proxys = append(pm.Proxys, pi) } pm.applyWeight() pm.LoadBalanceMode = cfg.LoadBalanceMode pm.CacheCleanInterval = cfg.CacheCleanInterval pm.DynamicWeightFile = cfg.DynamicWeightFile pm.Cache = make(map[string]int) go pm.keepClearingCache() go pm.dynamicWeightLoader() return &pm, nil } func (pm *ProxyManager) DescribeLoadBalanceMode() string { if pm.LoadBalanceMode == sticky { return fmt.Sprintf("sticky with cache clearing interval: %ds", pm.CacheCleanInterval) } else if pm.LoadBalanceMode == cachedShuffle { return "cached shuffling" } else { return "randomize every connection" } } func (pm *ProxyManager) applyWeight() { // update Chooser and enabledIndics var chooseArr []wr.Choice previousEnabledIndics := pm.enabledIndics pm.enabledIndics = nil for idx, p := range pm.Proxys { if p.Weight > 0 { pm.enabledIndics = append(pm.enabledIndics, idx) chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)}) } } if len(pm.enabledIndics) == 0 { fmt.Println("no enabled indice, proxy will be disabled.") return } fmt.Println("enabled indices:", pm.enabledIndics) pm.Chooser = wr.NewChooser(chooseArr...) // update cache to remove disabled proxys for addr, idx := range pm.Cache { i := previousEnabledIndics[idx] pos := sort.SearchInts(pm.enabledIndics, i) if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i { // found in current enabled list pm.Cache[addr] = pos } else { // not found in current enabled list pm.Cache[addr] = pm.Chooser.Pick().(int) } } } func (pm *ProxyManager) Get(addr string) (string, bool) { pm.mux.Lock() defer pm.mux.Unlock() // fmt.Println(pm.Cache) if len(pm.enabledIndics) == 0 { // return error if no enabled proxy return "", false } if pm.LoadBalanceMode == sticky { idx, ok := pm.Cache[addr] if ok { // fmt.Println("match addr", addr, "using:", idx) return pm.Proxys[pm.enabledIndics[idx]].URL, true } else { idx := pm.Chooser.Pick().(int) pm.Cache[addr] = idx return pm.Proxys[pm.enabledIndics[idx]].URL, true } } else if pm.LoadBalanceMode == cachedShuffle { idx, ok := pm.Cache[addr] if ok { pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics)) return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true } else { idx := pm.Chooser.Pick().(int) pm.Cache[addr] = idx return pm.Proxys[pm.enabledIndics[idx]].URL, true } } else { idx := pm.Chooser.Pick().(int) return pm.Proxys[pm.enabledIndics[idx]].URL, true } } func (pm *ProxyManager) ClearCache() { // fmt.Println("clearing cache") pm.mux.Lock() pm.Cache = make(map[string]int) pm.mux.Unlock() // fmt.Println("after:", pm.Cache) } func (pm *ProxyManager) keepClearingCache() { interval := pm.CacheCleanInterval if interval <= 0 { return } for { time.Sleep(time.Duration(interval) * time.Second) pm.ClearCache() } } func (pm *ProxyManager) dynamicWeightLoader() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGTERM) go func() { for { <-sigs fmt.Println("Loading new weights") file, err := ioutil.ReadFile(pm.DynamicWeightFile) if err != nil { fmt.Println("Error loading new weights, error:", err) continue } dw := dynamicWeight{} err = json.Unmarshal([]byte(file), &dw) if err != nil { fmt.Println("Error loading new weights, json parsing error:", err) continue } // check if number of weights is the same as the number of proxys configured if len(dw.Weights) != len(pm.Proxys) { fmt.Println("Error loading new weights, unexpected number of weights") continue } // apply the weights pm.mux.Lock() for idx, weight := range dw.Weights { pm.Proxys[idx].Weight = weight } pm.applyWeight() pm.mux.Unlock() } }() }