package main import ( "container/list" "encoding/json" "fmt" "io/ioutil" "os" "os/signal" "sort" "sync" "syscall" wr "github.com/mroth/weightedrand" "gopkg.in/yaml.v2" ) const ( cachedShuffle = "cached-shuffle" sticky = "sticky" fallback = "fallback" ) type queueMapElement struct { i int e *list.Element } type queueMap struct { m map[string]*queueMapElement l *list.List } func NewQueueMap() *queueMap { var qm queueMap qm.l = list.New() qm.m = make(map[string]*queueMapElement) return &qm } func (qm *queueMap) Set(key string, value int) { qme, present := qm.m[key] if present { // key already in qm, bring it to the head qm.l.MoveToFront(qme.e) qme.i = value } else { // key not present, add to the head e := qm.l.PushFront(key) newQME := queueMapElement{i: value, e: e} qm.m[key] = &newQME } } func (qm *queueMap) Get(key string) (int, bool) { value, present := qm.m[key] if present { return value.i, present } return 0, false } func (qm *queueMap) Trim(maxLength int) { nToTrim := qm.l.Len() - maxLength if nToTrim > 0 { for i := 0; i < nToTrim; i++ { e := qm.l.Back() key := (e.Value).(string) delete(qm.m, key) qm.l.Remove(e) } } } func (qm *queueMap) SetTrim(key string, value int, maxLength int) { qm.Set(key, value) qm.Trim(maxLength) } func (qm *queueMap) MapFilter(f func(int) int) { e := qm.l.Front() for e != nil { key := e.Value.(string) newValue := f(qm.m[key].i) if newValue < 0 { // need to remove delete(qm.m, key) ne := e.Next() qm.l.Remove(e) e = ne } else { qm.m[key].i = newValue e = e.Next() } } } type proxyInst struct { URL string Weight int StatusHistory []bool } type ProxyManager struct { Proxys []proxyInst Cache *queueMap LoadBalanceMode string CacheMaxLength int DynamicWeightFile string Chooser wr.Chooser enabledIndics []int mux sync.Mutex } type proxyConf struct { Proxys []struct { URL string `yaml:"url"` Weight int `yaml:"weight"` } `yaml:"proxy"` LoadBalanceMode string `yaml:"load-balance-mode"` CacheMaxLength int `yaml:"cache-max-length"` DynamicWeightFile string `yaml:"dynamic-weight-file"` } type dynamicWeight struct { Weights []int `json:"weights"` } func NewPM(confFp string) (*ProxyManager, error) { pm := ProxyManager{} f, err := os.Open(confFp) if err != nil { return &ProxyManager{}, err } defer f.Close() var cfg proxyConf decoder := yaml.NewDecoder(f) err = decoder.Decode(&cfg) if err != nil { return &ProxyManager{}, err } for _, pc := range cfg.Proxys { var pi proxyInst pi.URL = pc.URL pi.Weight = pc.Weight pm.Proxys = append(pm.Proxys, pi) } pm.LoadBalanceMode = cfg.LoadBalanceMode pm.CacheMaxLength = cfg.CacheMaxLength pm.DynamicWeightFile = cfg.DynamicWeightFile pm.Cache = NewQueueMap() pm.applyWeight() // go pm.keepClearingCache() pm.dynamicWeightLoader() pm.listenToClearCache() return &pm, nil } func (pm *ProxyManager) DescribeLoadBalanceMode() string { if pm.LoadBalanceMode == sticky { return fmt.Sprintf("sticky with maximum cache size of %d", pm.CacheMaxLength) } else if pm.LoadBalanceMode == cachedShuffle { return fmt.Sprintf("cached shuffle with maximum cache size of %d", pm.CacheMaxLength) } else if pm.LoadBalanceMode == fallback { return "fallback mode" } else { return "randomize every connection" } } func (pm *ProxyManager) applyWeight() { // update Chooser and enabledIndics var chooseArr []wr.Choice previousEnabledIndics := pm.enabledIndics pm.enabledIndics = nil for idx, p := range pm.Proxys { if p.Weight > 0 { pm.enabledIndics = append(pm.enabledIndics, idx) chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)}) } } if len(pm.enabledIndics) == 0 { fmt.Println("no enabled indice, proxy will be disabled.") pm.Cache = NewQueueMap() return } fmt.Println("enabled indices:", pm.enabledIndics) pm.Chooser = wr.NewChooser(chooseArr...) // update cache to remove disabled proxys filterFn := func(idx int) int { i := previousEnabledIndics[idx] pos := sort.SearchInts(pm.enabledIndics, i) shouldRemain := !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i if shouldRemain { // found in current enabled indics, use it return pos } else { return -1 } } pm.Cache.MapFilter(filterFn) } func (pm *ProxyManager) Get(addr string) (string, bool) { pm.mux.Lock() defer pm.mux.Unlock() // fmt.Println(pm.Cache)IsIn if pm.LoadBalanceMode == sticky { idx, ok := pm.Cache.Get(addr) if ok { // fmt.Println("match addr", addr, "using:", idx) return pm.Proxys[pm.enabledIndics[idx]].URL, true } else { idx := pm.Chooser.Pick().(int) pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength) return pm.Proxys[pm.enabledIndics[idx]].URL, true } } else if pm.LoadBalanceMode == cachedShuffle { idx, ok := pm.Cache.Get(addr) if ok { idx = (idx + 1) % (len(pm.enabledIndics)) } else { idx = pm.Chooser.Pick().(int) } pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength) return pm.Proxys[pm.enabledIndics[idx]].URL, true } else if pm.LoadBalanceMode == fallback { addr := "*" idx, ok := pm.Cache.Get(addr) if !ok { idx = pm.Chooser.Pick().(int) pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength) } return pm.Proxys[pm.enabledIndics[idx]].URL, true } else { idx := pm.Chooser.Pick().(int) return pm.Proxys[pm.enabledIndics[idx]].URL, true } } func (pm *ProxyManager) listenToClearCache() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGUSR2) //, syscall.SIGTERM) go func() { for { <-sigs pm.mux.Lock() pm.Cache = NewQueueMap() pm.mux.Unlock() } }() } func (pm *ProxyManager) dynamicWeightLoader() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGUSR1) //, syscall.SIGTERM) go func() { for { <-sigs fmt.Println("Loading new weights") file, err := ioutil.ReadFile(pm.DynamicWeightFile) if err != nil { fmt.Println("Error loading new weights, error:", err) continue } dw := dynamicWeight{} err = json.Unmarshal([]byte(file), &dw) if err != nil { fmt.Println("Error loading new weights, json parsing error:", err) continue } // check if number of weights is the same as the number of proxys configured if len(dw.Weights) != len(pm.Proxys) { fmt.Println("Error loading new weights, unexpected number of weights") continue } // apply the weights pm.mux.Lock() for idx, weight := range dw.Weights { pm.Proxys[idx].Weight = weight } pm.applyWeight() pm.mux.Unlock() } }() }