From b87bd9c2b81085709ea5c598f228ab875ed00de4 Mon Sep 17 00:00:00 2001 From: mantaohuang Date: Fri, 10 Apr 2020 17:14:01 -0400 Subject: [PATCH] refactor go code --- go-socks-lb/main.go | 271 +----------------------------------- go-socks-lb/manager.go | 204 +++++++++++++++++++++++++++ go-socks-lb/proxychecker.go | 81 +++++++++++ 3 files changed, 287 insertions(+), 269 deletions(-) create mode 100644 go-socks-lb/manager.go create mode 100644 go-socks-lb/proxychecker.go diff --git a/go-socks-lb/main.go b/go-socks-lb/main.go index b02b211..9acd215 100644 --- a/go-socks-lb/main.go +++ b/go-socks-lb/main.go @@ -1,274 +1,14 @@ package main import ( - "encoding/json" "flag" "fmt" - "io/ioutil" "math/rand" - "net/http" - "net/url" "os" - "os/signal" - "sort" - "sync" - "syscall" "time" - - wr "github.com/mroth/weightedrand" - "golang.org/x/net/proxy" - "gopkg.in/yaml.v2" ) -const ( - URL = "http://connectivitycheck.gstatic.com/generate_204" - cachedShuffle = "cached-shuffle" - sticky = "sticky" -) - -func proxyTestStatusCode(proxyURL string, URL string, StatusCode int) bool { - // create a socks5 dialer - u, err := url.Parse(proxyURL) - if err != nil { - fmt.Println("error parsing") - return false - } - dialer, err := proxy.FromURL(u, proxy.Direct) - if err != nil { - fmt.Fprintln(os.Stderr, "can't connect to the proxy:", err) - return false - } - // setup a http client - httpTransport := &http.Transport{} - httpClient := &http.Client{ - Transport: httpTransport, - CheckRedirect: func(req *http.Request, via []*http.Request) error { - fmt.Printf("redirect %v", *req) - return http.ErrUseLastResponse - }, - } - // set our socks5 as the dialer - httpTransport.Dial = dialer.Dial - // create a request - req, err := http.NewRequest("GET", URL, nil) - if err != nil { - fmt.Fprintln(os.Stderr, "can't create request:", err) - return false - } - // use the http client to fetch the page - resp, err := httpClient.Do(req) - if err != nil { - fmt.Fprintln(os.Stderr, "can't GET page:", err) - return false - } - // defer resp.Body.Close() - // b, err := ioutil.ReadAll(resp.Body) - // if err != nil { - // fmt.Fprintln(os.Stderr, "error reading body:", err) - // return false - // } - return resp.StatusCode == StatusCode -} - -func testTask(testfn func() bool, interval int, stop chan bool) { - for { - go testfn() - select { - case <-stop: - fmt.Println("stopping") - return - default: - } - time.Sleep(time.Duration(interval) * time.Second) - } -} - -type proxyInst struct { - URL string - Weight int - StatusHistory []bool -} -type ProxyManager struct { - Proxys []proxyInst - Cache map[string]int - LoadBalanceMode string - CacheCleanInterval int - DynamicWeightFile string - Chooser wr.Chooser - enabledIndics []int - mux sync.Mutex -} -type proxyConf struct { - Proxys []struct { - URL string `yaml:"url"` - Weight int `yaml:"weight"` - } `yaml:"proxy"` - LoadBalanceMode string `yaml:"load-balance-mode"` - CacheCleanInterval int `yaml:"cache-clean-interval"` - DynamicWeightFile string `yaml:"dynamic-weight-file"` -} -type dynamicWeight struct { - Weights []int `json:"weights"` -} - -func NewPM(confFp string) (*ProxyManager, error) { - pm := ProxyManager{} - f, err := os.Open(confFp) - if err != nil { - return &ProxyManager{}, err - } - defer f.Close() - - var cfg proxyConf - decoder := yaml.NewDecoder(f) - err = decoder.Decode(&cfg) - if err != nil { - return &ProxyManager{}, err - } - for _, pc := range cfg.Proxys { - var pi proxyInst - pi.URL = pc.URL - pi.Weight = pc.Weight - pm.Proxys = append(pm.Proxys, pi) - } - pm.applyWeight() - pm.LoadBalanceMode = cfg.LoadBalanceMode - pm.CacheCleanInterval = cfg.CacheCleanInterval - pm.DynamicWeightFile = cfg.DynamicWeightFile - pm.Cache = make(map[string]int) - go pm.keepClearingCache() - go pm.dynamicWeightLoader() - return &pm, nil -} -func (pm *ProxyManager) applyWeight() { - // update Chooser and enabledIndics - var chooseArr []wr.Choice - previousEnabledIndics := pm.enabledIndics - pm.enabledIndics = nil - for idx, p := range pm.Proxys { - if p.Weight > 0 { - pm.enabledIndics = append(pm.enabledIndics, idx) - chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)}) - } - } - if len(pm.enabledIndics) == 0 { - fmt.Println("no enabled indice, proxy will be disabled.") - return - } - fmt.Println("enabled indices:", pm.enabledIndics) - pm.Chooser = wr.NewChooser(chooseArr...) - // update cache to remove disabled proxys - for addr, idx := range pm.Cache { - i := previousEnabledIndics[idx] - pos := sort.SearchInts(pm.enabledIndics, i) - if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i { - // found in current enabled list - pm.Cache[addr] = pos - } else { - // not found in current enabled list - pm.Cache[addr] = pm.Chooser.Pick().(int) - } - } -} -func (pm *ProxyManager) Get(addr string) (string, bool) { - pm.mux.Lock() - defer pm.mux.Unlock() - // fmt.Println(pm.Cache) - if len(pm.enabledIndics) == 0 { - // return error if no enabled proxy - return "", false - } - if pm.LoadBalanceMode == sticky { - idx, ok := pm.Cache[addr] - if ok { - // fmt.Println("match addr", addr, "using:", idx) - return pm.Proxys[pm.enabledIndics[idx]].URL, true - } else { - idx := pm.Chooser.Pick().(int) - pm.Cache[addr] = idx - return pm.Proxys[pm.enabledIndics[idx]].URL, true - } - } else if pm.LoadBalanceMode == cachedShuffle { - idx, ok := pm.Cache[addr] - if ok { - pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics)) - return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true - } else { - idx := pm.Chooser.Pick().(int) - pm.Cache[addr] = idx - return pm.Proxys[pm.enabledIndics[idx]].URL, true - } - } else { - idx := pm.Chooser.Pick().(int) - return pm.Proxys[pm.enabledIndics[idx]].URL, true - } -} - -func (pm *ProxyManager) ClearCache() { - // fmt.Println("clearing cache") - pm.mux.Lock() - pm.Cache = make(map[string]int) - pm.mux.Unlock() - // fmt.Println("after:", pm.Cache) -} - -func (pm *ProxyManager) keepClearingCache() { - interval := pm.CacheCleanInterval - if interval <= 0 { - return - } - for { - time.Sleep(time.Duration(interval) * time.Second) - pm.ClearCache() - } -} - -func (pm *ProxyManager) dynamicWeightLoader() { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGTERM) - go func() { - for { - <-sigs - fmt.Println("Loading new weights") - file, err := ioutil.ReadFile(pm.DynamicWeightFile) - if err != nil { - fmt.Println("Error loading new weights, error:", err) - continue - } - dw := dynamicWeight{} - err = json.Unmarshal([]byte(file), &dw) - if err != nil { - fmt.Println("Error loading new weights, json parsing error:", err) - continue - } - // check if number of weights is the same as the number of proxys configured - if len(dw.Weights) != len(pm.Proxys) { - fmt.Println("Error loading new weights, unexpected number of weights") - continue - } - // apply the weights - pm.mux.Lock() - for idx, weight := range dw.Weights { - pm.Proxys[idx].Weight = weight - } - pm.applyWeight() - pm.mux.Unlock() - } - }() -} - func main() { - // testFn := func() bool { - // status := proxyTestStatusCode("socks5://127.0.0.1:1080", URL, 204) - // fmt.Println(status) - // return status - // } - // stop := make(chan bool) - // go testTask(testFn, 5, stop) - // time.Sleep(10 * time.Second) - // stop <- true - // time.Sleep(10 * time.Second) - configPath := flag.String("config", "", "Config file.") bindAddr := flag.String("bind", "127.0.0.1:7000", "Bind address and port") flag.Parse() @@ -280,18 +20,11 @@ func main() { var pm *ProxyManager pm, err := NewPM(*configPath) - fmt.Println("pm:", pm) + // fmt.Println("pm:", pm) if err != nil { return } - fmt.Printf("Load balance mode: ") - if pm.LoadBalanceMode == sticky { - fmt.Printf("sticky with cache clearing interval: %ds.\n", pm.CacheCleanInterval) - } else if pm.LoadBalanceMode == cachedShuffle { - fmt.Println("cached shuffling.") - } else { - fmt.Println("randomize every connection.") - } + fmt.Printf("Load balance mode: %s.\n", pm.DescribeLoadBalanceMode()) conf := &Config{PM: pm} server, err := New(conf) if err != nil { diff --git a/go-socks-lb/manager.go b/go-socks-lb/manager.go new file mode 100644 index 0000000..d559a91 --- /dev/null +++ b/go-socks-lb/manager.go @@ -0,0 +1,204 @@ +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/signal" + "sort" + "sync" + "syscall" + "time" + + wr "github.com/mroth/weightedrand" + "gopkg.in/yaml.v2" +) + +const ( + cachedShuffle = "cached-shuffle" + sticky = "sticky" +) + +type proxyInst struct { + URL string + Weight int + StatusHistory []bool +} +type ProxyManager struct { + Proxys []proxyInst + Cache map[string]int + LoadBalanceMode string + CacheCleanInterval int + DynamicWeightFile string + Chooser wr.Chooser + enabledIndics []int + mux sync.Mutex +} +type proxyConf struct { + Proxys []struct { + URL string `yaml:"url"` + Weight int `yaml:"weight"` + } `yaml:"proxy"` + LoadBalanceMode string `yaml:"load-balance-mode"` + CacheCleanInterval int `yaml:"cache-clean-interval"` + DynamicWeightFile string `yaml:"dynamic-weight-file"` +} +type dynamicWeight struct { + Weights []int `json:"weights"` +} + +func NewPM(confFp string) (*ProxyManager, error) { + pm := ProxyManager{} + f, err := os.Open(confFp) + if err != nil { + return &ProxyManager{}, err + } + defer f.Close() + + var cfg proxyConf + decoder := yaml.NewDecoder(f) + err = decoder.Decode(&cfg) + if err != nil { + return &ProxyManager{}, err + } + for _, pc := range cfg.Proxys { + var pi proxyInst + pi.URL = pc.URL + pi.Weight = pc.Weight + pm.Proxys = append(pm.Proxys, pi) + } + pm.applyWeight() + pm.LoadBalanceMode = cfg.LoadBalanceMode + pm.CacheCleanInterval = cfg.CacheCleanInterval + pm.DynamicWeightFile = cfg.DynamicWeightFile + pm.Cache = make(map[string]int) + go pm.keepClearingCache() + go pm.dynamicWeightLoader() + return &pm, nil +} +func (pm *ProxyManager) DescribeLoadBalanceMode() string { + if pm.LoadBalanceMode == sticky { + return fmt.Sprintf("sticky with cache clearing interval: %ds", pm.CacheCleanInterval) + } else if pm.LoadBalanceMode == cachedShuffle { + return "cached shuffling" + } else { + return "randomize every connection" + } +} +func (pm *ProxyManager) applyWeight() { + // update Chooser and enabledIndics + var chooseArr []wr.Choice + previousEnabledIndics := pm.enabledIndics + pm.enabledIndics = nil + for idx, p := range pm.Proxys { + if p.Weight > 0 { + pm.enabledIndics = append(pm.enabledIndics, idx) + chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)}) + } + } + if len(pm.enabledIndics) == 0 { + fmt.Println("no enabled indice, proxy will be disabled.") + return + } + fmt.Println("enabled indices:", pm.enabledIndics) + pm.Chooser = wr.NewChooser(chooseArr...) + // update cache to remove disabled proxys + for addr, idx := range pm.Cache { + i := previousEnabledIndics[idx] + pos := sort.SearchInts(pm.enabledIndics, i) + if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i { + // found in current enabled list + pm.Cache[addr] = pos + } else { + // not found in current enabled list + pm.Cache[addr] = pm.Chooser.Pick().(int) + } + } +} +func (pm *ProxyManager) Get(addr string) (string, bool) { + pm.mux.Lock() + defer pm.mux.Unlock() + // fmt.Println(pm.Cache) + if len(pm.enabledIndics) == 0 { + // return error if no enabled proxy + return "", false + } + if pm.LoadBalanceMode == sticky { + idx, ok := pm.Cache[addr] + if ok { + // fmt.Println("match addr", addr, "using:", idx) + return pm.Proxys[pm.enabledIndics[idx]].URL, true + } else { + idx := pm.Chooser.Pick().(int) + pm.Cache[addr] = idx + return pm.Proxys[pm.enabledIndics[idx]].URL, true + } + } else if pm.LoadBalanceMode == cachedShuffle { + idx, ok := pm.Cache[addr] + if ok { + pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics)) + return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true + } else { + idx := pm.Chooser.Pick().(int) + pm.Cache[addr] = idx + return pm.Proxys[pm.enabledIndics[idx]].URL, true + } + } else { + idx := pm.Chooser.Pick().(int) + return pm.Proxys[pm.enabledIndics[idx]].URL, true + } +} + +func (pm *ProxyManager) ClearCache() { + // fmt.Println("clearing cache") + pm.mux.Lock() + pm.Cache = make(map[string]int) + pm.mux.Unlock() + // fmt.Println("after:", pm.Cache) +} + +func (pm *ProxyManager) keepClearingCache() { + interval := pm.CacheCleanInterval + if interval <= 0 { + return + } + for { + time.Sleep(time.Duration(interval) * time.Second) + pm.ClearCache() + } +} + +func (pm *ProxyManager) dynamicWeightLoader() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGTERM) + go func() { + for { + <-sigs + fmt.Println("Loading new weights") + file, err := ioutil.ReadFile(pm.DynamicWeightFile) + if err != nil { + fmt.Println("Error loading new weights, error:", err) + continue + } + dw := dynamicWeight{} + err = json.Unmarshal([]byte(file), &dw) + if err != nil { + fmt.Println("Error loading new weights, json parsing error:", err) + continue + } + // check if number of weights is the same as the number of proxys configured + if len(dw.Weights) != len(pm.Proxys) { + fmt.Println("Error loading new weights, unexpected number of weights") + continue + } + // apply the weights + pm.mux.Lock() + for idx, weight := range dw.Weights { + pm.Proxys[idx].Weight = weight + } + pm.applyWeight() + pm.mux.Unlock() + } + }() +} diff --git a/go-socks-lb/proxychecker.go b/go-socks-lb/proxychecker.go new file mode 100644 index 0000000..9f42d02 --- /dev/null +++ b/go-socks-lb/proxychecker.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "net/http" + "net/url" + "os" + "time" + + "golang.org/x/net/proxy" +) + +const URL = "http://connectivitycheck.gstatic.com/generate_204" + +func proxyTestStatusCode(proxyURL string, URL string, StatusCode int) bool { + // create a socks5 dialer + u, err := url.Parse(proxyURL) + if err != nil { + fmt.Println("error parsing") + return false + } + dialer, err := proxy.FromURL(u, proxy.Direct) + if err != nil { + fmt.Fprintln(os.Stderr, "can't connect to the proxy:", err) + return false + } + // setup a http client + httpTransport := &http.Transport{} + httpClient := &http.Client{ + Transport: httpTransport, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + fmt.Printf("redirect %v", *req) + return http.ErrUseLastResponse + }, + } + // set our socks5 as the dialer + httpTransport.Dial = dialer.Dial + // create a request + req, err := http.NewRequest("GET", URL, nil) + if err != nil { + fmt.Fprintln(os.Stderr, "can't create request:", err) + return false + } + // use the http client to fetch the page + resp, err := httpClient.Do(req) + if err != nil { + fmt.Fprintln(os.Stderr, "can't GET page:", err) + return false + } + // defer resp.Body.Close() + // b, err := ioutil.ReadAll(resp.Body) + // if err != nil { + // fmt.Fprintln(os.Stderr, "error reading body:", err) + // return false + // } + return resp.StatusCode == StatusCode +} + +// testFn := func() bool { +// status := proxyTestStatusCode("socks5://127.0.0.1:1080", URL, 204) +// fmt.Println(status) +// return status +// } +// stop := make(chan bool) +// go testTask(testFn, 5, stop) +// time.Sleep(10 * time.Second) +// stop <- true +// time.Sleep(10 * time.Second) + +func testTask(testfn func() bool, interval int, stop chan bool) { + for { + go testfn() + select { + case <-stop: + fmt.Println("stopping") + return + default: + } + time.Sleep(time.Duration(interval) * time.Second) + } +}