package main import ( "encoding/json" "flag" "fmt" "io/ioutil" "math/rand" "net/http" "net/url" "os" "os/signal" "sort" "sync" "syscall" "time" wr "github.com/mroth/weightedrand" "golang.org/x/net/proxy" "gopkg.in/yaml.v2" ) const ( URL = "http://connectivitycheck.gstatic.com/generate_204" cachedShuffle = "cached-shuffle" sticky = "sticky" ) func proxyTestStatusCode(proxyURL string, URL string, StatusCode int) bool { // create a socks5 dialer u, err := url.Parse(proxyURL) if err != nil { fmt.Println("error parsing") return false } dialer, err := proxy.FromURL(u, proxy.Direct) if err != nil { fmt.Fprintln(os.Stderr, "can't connect to the proxy:", err) return false } // setup a http client httpTransport := &http.Transport{} httpClient := &http.Client{ Transport: httpTransport, CheckRedirect: func(req *http.Request, via []*http.Request) error { fmt.Printf("redirect %v", *req) return http.ErrUseLastResponse }, } // set our socks5 as the dialer httpTransport.Dial = dialer.Dial // create a request req, err := http.NewRequest("GET", URL, nil) if err != nil { fmt.Fprintln(os.Stderr, "can't create request:", err) return false } // use the http client to fetch the page resp, err := httpClient.Do(req) if err != nil { fmt.Fprintln(os.Stderr, "can't GET page:", err) return false } // defer resp.Body.Close() // b, err := ioutil.ReadAll(resp.Body) // if err != nil { // fmt.Fprintln(os.Stderr, "error reading body:", err) // return false // } return resp.StatusCode == StatusCode } func testTask(testfn func() bool, interval int, stop chan bool) { for { go testfn() select { case <-stop: fmt.Println("stopping") return default: } time.Sleep(time.Duration(interval) * time.Second) } } type proxyInst struct { URL string Weight int StatusHistory []bool } type ProxyManager struct { Proxys []proxyInst Cache map[string]int LoadBalanceMode string CacheCleanInterval int DynamicWeightFile string Chooser wr.Chooser enabledIndics []int mux sync.Mutex } type proxyConf struct { Proxys []struct { URL string `yaml:"url"` Weight int `yaml:"weight"` } `yaml:"proxy"` LoadBalanceMode string `yaml:"load-balance-mode"` CacheCleanInterval int `yaml:"cache-clean-interval"` DynamicWeightFile string `yaml:"dynamic-weight-file"` } type dynamicWeight struct { Weights []int `json:"weights"` } func NewPM(confFp string) (*ProxyManager, error) { pm := ProxyManager{} f, err := os.Open(confFp) if err != nil { return &ProxyManager{}, err } defer f.Close() var cfg proxyConf decoder := yaml.NewDecoder(f) err = decoder.Decode(&cfg) if err != nil { return &ProxyManager{}, err } for _, pc := range cfg.Proxys { var pi proxyInst pi.URL = pc.URL pi.Weight = pc.Weight pm.Proxys = append(pm.Proxys, pi) } pm.applyWeight() pm.LoadBalanceMode = cfg.LoadBalanceMode pm.CacheCleanInterval = cfg.CacheCleanInterval pm.DynamicWeightFile = cfg.DynamicWeightFile pm.Cache = make(map[string]int) go pm.keepClearingCache() go pm.dynamicWeightLoader() return &pm, nil } func (pm *ProxyManager) applyWeight() { // update Chooser and enabledIndics var chooseArr []wr.Choice previousEnabledIndics := pm.enabledIndics pm.enabledIndics = nil for idx, p := range pm.Proxys { if p.Weight > 0 { pm.enabledIndics = append(pm.enabledIndics, idx) chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)}) } } if len(pm.enabledIndics) == 0 { fmt.Println("no enabled indice, proxy will be disabled.") return } fmt.Println("enabled indices:", pm.enabledIndics) pm.Chooser = wr.NewChooser(chooseArr...) // update cache to remove disabled proxys for addr, idx := range pm.Cache { i := previousEnabledIndics[idx] pos := sort.SearchInts(pm.enabledIndics, i) if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i { // found in current enabled list pm.Cache[addr] = pos } else { // not found in current enabled list pm.Cache[addr] = pm.Chooser.Pick().(int) } } } func (pm *ProxyManager) Get(addr string) (string, bool) { pm.mux.Lock() defer pm.mux.Unlock() // fmt.Println(pm.Cache) if len(pm.enabledIndics) == 0 { // return error if no enabled proxy return "", false } if pm.LoadBalanceMode == sticky { idx, ok := pm.Cache[addr] if ok { // fmt.Println("match addr", addr, "using:", idx) return pm.Proxys[pm.enabledIndics[idx]].URL, true } else { idx := pm.Chooser.Pick().(int) pm.Cache[addr] = idx return pm.Proxys[pm.enabledIndics[idx]].URL, true } } else if pm.LoadBalanceMode == cachedShuffle { idx, ok := pm.Cache[addr] if ok { pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics)) return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true } else { idx := pm.Chooser.Pick().(int) pm.Cache[addr] = idx return pm.Proxys[pm.enabledIndics[idx]].URL, true } } else { idx := pm.Chooser.Pick().(int) return pm.Proxys[pm.enabledIndics[idx]].URL, true } } func (pm *ProxyManager) ClearCache() { // fmt.Println("clearing cache") pm.mux.Lock() pm.Cache = make(map[string]int) pm.mux.Unlock() // fmt.Println("after:", pm.Cache) } func (pm *ProxyManager) keepClearingCache() { interval := pm.CacheCleanInterval if interval <= 0 { return } for { time.Sleep(time.Duration(interval) * time.Second) pm.ClearCache() } } func (pm *ProxyManager) dynamicWeightLoader() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGTERM) go func() { for { <-sigs fmt.Println("Loading new weights") file, err := ioutil.ReadFile(pm.DynamicWeightFile) if err != nil { fmt.Println("Error loading new weights, error:", err) continue } dw := dynamicWeight{} err = json.Unmarshal([]byte(file), &dw) if err != nil { fmt.Println("Error loading new weights, json parsing error:", err) continue } // check if number of weights is the same as the number of proxys configured if len(dw.Weights) != len(pm.Proxys) { fmt.Println("Error loading new weights, unexpected number of weights") continue } // apply the weights pm.mux.Lock() for idx, weight := range dw.Weights { pm.Proxys[idx].Weight = weight } pm.applyWeight() pm.mux.Unlock() } }() } func main() { // testFn := func() bool { // status := proxyTestStatusCode("socks5://127.0.0.1:1080", URL, 204) // fmt.Println(status) // return status // } // stop := make(chan bool) // go testTask(testFn, 5, stop) // time.Sleep(10 * time.Second) // stop <- true // time.Sleep(10 * time.Second) configPath := flag.String("config", "", "Config file.") bindAddr := flag.String("bind", "127.0.0.1:7000", "Bind address and port") flag.Parse() if *configPath == "" { flag.PrintDefaults() os.Exit(1) } rand.Seed(time.Now().UTC().UnixNano()) // always seed random! var pm *ProxyManager pm, err := NewPM(*configPath) fmt.Println("pm:", pm) if err != nil { return } fmt.Printf("Load balance mode: ") if pm.LoadBalanceMode == sticky { fmt.Printf("sticky with cache clearing interval: %ds.\n", pm.CacheCleanInterval) } else if pm.LoadBalanceMode == cachedShuffle { fmt.Println("cached shuffling.") } else { fmt.Println("randomize every connection.") } conf := &Config{PM: pm} server, err := New(conf) if err != nil { panic(err) } // Create SOCKS5 proxy on localhost port 8000 if err := server.ListenAndServe("tcp", *bindAddr); err != nil { panic(err) } }