277 lines
6.5 KiB
Go
277 lines
6.5 KiB
Go
package main
|
|
|
|
import (
|
|
"container/list"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"os/signal"
|
|
"sort"
|
|
"sync"
|
|
"syscall"
|
|
|
|
wr "github.com/mroth/weightedrand"
|
|
"gopkg.in/yaml.v2"
|
|
)
|
|
|
|
const (
|
|
cachedShuffle = "cached-shuffle"
|
|
sticky = "sticky"
|
|
fallback = "fallback"
|
|
)
|
|
|
|
type queueMapElement struct {
|
|
i int
|
|
e *list.Element
|
|
}
|
|
type queueMap struct {
|
|
m map[string]*queueMapElement
|
|
l *list.List
|
|
}
|
|
|
|
func NewQueueMap() *queueMap {
|
|
var qm queueMap
|
|
qm.l = list.New()
|
|
qm.m = make(map[string]*queueMapElement)
|
|
return &qm
|
|
}
|
|
|
|
func (qm *queueMap) Set(key string, value int) {
|
|
qme, present := qm.m[key]
|
|
if present {
|
|
// key already in qm, bring it to the head
|
|
qm.l.MoveToFront(qme.e)
|
|
qme.i = value
|
|
} else {
|
|
// key not present, add to the head
|
|
e := qm.l.PushFront(key)
|
|
newQME := queueMapElement{i: value, e: e}
|
|
qm.m[key] = &newQME
|
|
}
|
|
}
|
|
func (qm *queueMap) Get(key string) (int, bool) {
|
|
value, present := qm.m[key]
|
|
if present {
|
|
return value.i, present
|
|
}
|
|
return 0, false
|
|
}
|
|
func (qm *queueMap) Trim(maxLength int) {
|
|
nToTrim := qm.l.Len() - maxLength
|
|
if nToTrim > 0 {
|
|
for i := 0; i < nToTrim; i++ {
|
|
e := qm.l.Back()
|
|
key := (e.Value).(string)
|
|
delete(qm.m, key)
|
|
qm.l.Remove(e)
|
|
}
|
|
}
|
|
}
|
|
func (qm *queueMap) SetTrim(key string, value int, maxLength int) {
|
|
qm.Set(key, value)
|
|
qm.Trim(maxLength)
|
|
}
|
|
func (qm *queueMap) MapFilter(f func(int) int) {
|
|
e := qm.l.Front()
|
|
for e != nil {
|
|
key := e.Value.(string)
|
|
newValue := f(qm.m[key].i)
|
|
if newValue < 0 {
|
|
// need to remove
|
|
delete(qm.m, key)
|
|
ne := e.Next()
|
|
qm.l.Remove(e)
|
|
e = ne
|
|
} else {
|
|
qm.m[key].i = newValue
|
|
e = e.Next()
|
|
}
|
|
}
|
|
}
|
|
|
|
type proxyInst struct {
|
|
URL string
|
|
Weight int
|
|
StatusHistory []bool
|
|
}
|
|
type ProxyManager struct {
|
|
Proxys []proxyInst
|
|
Cache *queueMap
|
|
LoadBalanceMode string
|
|
CacheMaxLength int
|
|
DynamicWeightFile string
|
|
Chooser wr.Chooser
|
|
enabledIndics []int
|
|
mux sync.Mutex
|
|
}
|
|
type proxyConf struct {
|
|
Proxys []struct {
|
|
URL string `yaml:"url"`
|
|
Weight int `yaml:"weight"`
|
|
} `yaml:"proxy"`
|
|
LoadBalanceMode string `yaml:"load-balance-mode"`
|
|
CacheMaxLength int `yaml:"cache-max-length"`
|
|
DynamicWeightFile string `yaml:"dynamic-weight-file"`
|
|
}
|
|
type dynamicWeight struct {
|
|
Weights []int `json:"weights"`
|
|
}
|
|
|
|
func NewPM(confFp string) (*ProxyManager, error) {
|
|
pm := ProxyManager{}
|
|
f, err := os.Open(confFp)
|
|
if err != nil {
|
|
return &ProxyManager{}, err
|
|
}
|
|
defer f.Close()
|
|
|
|
var cfg proxyConf
|
|
decoder := yaml.NewDecoder(f)
|
|
err = decoder.Decode(&cfg)
|
|
if err != nil {
|
|
return &ProxyManager{}, err
|
|
}
|
|
for _, pc := range cfg.Proxys {
|
|
var pi proxyInst
|
|
pi.URL = pc.URL
|
|
pi.Weight = pc.Weight
|
|
pm.Proxys = append(pm.Proxys, pi)
|
|
}
|
|
pm.LoadBalanceMode = cfg.LoadBalanceMode
|
|
pm.CacheMaxLength = cfg.CacheMaxLength
|
|
pm.DynamicWeightFile = cfg.DynamicWeightFile
|
|
pm.Cache = NewQueueMap()
|
|
pm.applyWeight()
|
|
// go pm.keepClearingCache()
|
|
pm.dynamicWeightLoader()
|
|
pm.listenToClearCache()
|
|
return &pm, nil
|
|
}
|
|
func (pm *ProxyManager) DescribeLoadBalanceMode() string {
|
|
if pm.LoadBalanceMode == sticky {
|
|
return fmt.Sprintf("sticky with maximum cache size of %d", pm.CacheMaxLength)
|
|
} else if pm.LoadBalanceMode == cachedShuffle {
|
|
return fmt.Sprintf("cached shuffle with maximum cache size of %d", pm.CacheMaxLength)
|
|
} else if pm.LoadBalanceMode == fallback {
|
|
return "fallback mode"
|
|
} else {
|
|
return "randomize every connection"
|
|
}
|
|
}
|
|
func (pm *ProxyManager) applyWeight() {
|
|
// update Chooser and enabledIndics
|
|
var chooseArr []wr.Choice
|
|
previousEnabledIndics := pm.enabledIndics
|
|
pm.enabledIndics = nil
|
|
for idx, p := range pm.Proxys {
|
|
if p.Weight > 0 {
|
|
pm.enabledIndics = append(pm.enabledIndics, idx)
|
|
chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)})
|
|
}
|
|
}
|
|
if len(pm.enabledIndics) == 0 {
|
|
fmt.Println("no enabled indice, proxy will be disabled.")
|
|
pm.Cache = NewQueueMap()
|
|
return
|
|
}
|
|
fmt.Println("enabled indices:", pm.enabledIndics)
|
|
pm.Chooser = wr.NewChooser(chooseArr...)
|
|
// update cache to remove disabled proxys
|
|
filterFn := func(idx int) int {
|
|
i := previousEnabledIndics[idx]
|
|
pos := sort.SearchInts(pm.enabledIndics, i)
|
|
shouldRemain := !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i
|
|
if shouldRemain {
|
|
// found in current enabled indics, use it
|
|
return pos
|
|
} else {
|
|
return -1
|
|
}
|
|
}
|
|
pm.Cache.MapFilter(filterFn)
|
|
}
|
|
func (pm *ProxyManager) Get(addr string) (string, bool) {
|
|
pm.mux.Lock()
|
|
defer pm.mux.Unlock()
|
|
// fmt.Println(pm.Cache)IsIn
|
|
if pm.LoadBalanceMode == sticky {
|
|
idx, ok := pm.Cache.Get(addr)
|
|
if ok {
|
|
// fmt.Println("match addr", addr, "using:", idx)
|
|
return pm.Proxys[pm.enabledIndics[idx]].URL, true
|
|
} else {
|
|
idx := pm.Chooser.Pick().(int)
|
|
pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength)
|
|
return pm.Proxys[pm.enabledIndics[idx]].URL, true
|
|
}
|
|
} else if pm.LoadBalanceMode == cachedShuffle {
|
|
idx, ok := pm.Cache.Get(addr)
|
|
if ok {
|
|
idx = (idx + 1) % (len(pm.enabledIndics))
|
|
} else {
|
|
idx = pm.Chooser.Pick().(int)
|
|
}
|
|
pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength)
|
|
return pm.Proxys[pm.enabledIndics[idx]].URL, true
|
|
} else if pm.LoadBalanceMode == fallback {
|
|
addr := "*"
|
|
idx, ok := pm.Cache.Get(addr)
|
|
if !ok {
|
|
idx = pm.Chooser.Pick().(int)
|
|
pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength)
|
|
}
|
|
return pm.Proxys[pm.enabledIndics[idx]].URL, true
|
|
} else {
|
|
idx := pm.Chooser.Pick().(int)
|
|
return pm.Proxys[pm.enabledIndics[idx]].URL, true
|
|
}
|
|
}
|
|
|
|
func (pm *ProxyManager) listenToClearCache() {
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGUSR2) //, syscall.SIGTERM)
|
|
go func() {
|
|
for {
|
|
<-sigs
|
|
pm.mux.Lock()
|
|
pm.Cache = NewQueueMap()
|
|
pm.mux.Unlock()
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (pm *ProxyManager) dynamicWeightLoader() {
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGUSR1) //, syscall.SIGTERM)
|
|
go func() {
|
|
for {
|
|
<-sigs
|
|
fmt.Println("Loading new weights")
|
|
file, err := ioutil.ReadFile(pm.DynamicWeightFile)
|
|
if err != nil {
|
|
fmt.Println("Error loading new weights, error:", err)
|
|
continue
|
|
}
|
|
dw := dynamicWeight{}
|
|
err = json.Unmarshal([]byte(file), &dw)
|
|
if err != nil {
|
|
fmt.Println("Error loading new weights, json parsing error:", err)
|
|
continue
|
|
}
|
|
// check if number of weights is the same as the number of proxys configured
|
|
if len(dw.Weights) != len(pm.Proxys) {
|
|
fmt.Println("Error loading new weights, unexpected number of weights")
|
|
continue
|
|
}
|
|
// apply the weights
|
|
pm.mux.Lock()
|
|
for idx, weight := range dw.Weights {
|
|
pm.Proxys[idx].Weight = weight
|
|
}
|
|
pm.applyWeight()
|
|
pm.mux.Unlock()
|
|
}
|
|
}()
|
|
}
|