refactor go code

This commit is contained in:
mantaohuang 2020-04-10 17:14:01 -04:00
parent 8c5d4e2f55
commit b87bd9c2b8
3 changed files with 287 additions and 269 deletions

View File

@ -1,274 +1,14 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"sort"
"sync"
"syscall"
"time"
wr "github.com/mroth/weightedrand"
"golang.org/x/net/proxy"
"gopkg.in/yaml.v2"
)
const (
URL = "http://connectivitycheck.gstatic.com/generate_204"
cachedShuffle = "cached-shuffle"
sticky = "sticky"
)
func proxyTestStatusCode(proxyURL string, URL string, StatusCode int) bool {
// create a socks5 dialer
u, err := url.Parse(proxyURL)
if err != nil {
fmt.Println("error parsing")
return false
}
dialer, err := proxy.FromURL(u, proxy.Direct)
if err != nil {
fmt.Fprintln(os.Stderr, "can't connect to the proxy:", err)
return false
}
// setup a http client
httpTransport := &http.Transport{}
httpClient := &http.Client{
Transport: httpTransport,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
fmt.Printf("redirect %v", *req)
return http.ErrUseLastResponse
},
}
// set our socks5 as the dialer
httpTransport.Dial = dialer.Dial
// create a request
req, err := http.NewRequest("GET", URL, nil)
if err != nil {
fmt.Fprintln(os.Stderr, "can't create request:", err)
return false
}
// use the http client to fetch the page
resp, err := httpClient.Do(req)
if err != nil {
fmt.Fprintln(os.Stderr, "can't GET page:", err)
return false
}
// defer resp.Body.Close()
// b, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// fmt.Fprintln(os.Stderr, "error reading body:", err)
// return false
// }
return resp.StatusCode == StatusCode
}
func testTask(testfn func() bool, interval int, stop chan bool) {
for {
go testfn()
select {
case <-stop:
fmt.Println("stopping")
return
default:
}
time.Sleep(time.Duration(interval) * time.Second)
}
}
type proxyInst struct {
URL string
Weight int
StatusHistory []bool
}
type ProxyManager struct {
Proxys []proxyInst
Cache map[string]int
LoadBalanceMode string
CacheCleanInterval int
DynamicWeightFile string
Chooser wr.Chooser
enabledIndics []int
mux sync.Mutex
}
type proxyConf struct {
Proxys []struct {
URL string `yaml:"url"`
Weight int `yaml:"weight"`
} `yaml:"proxy"`
LoadBalanceMode string `yaml:"load-balance-mode"`
CacheCleanInterval int `yaml:"cache-clean-interval"`
DynamicWeightFile string `yaml:"dynamic-weight-file"`
}
type dynamicWeight struct {
Weights []int `json:"weights"`
}
func NewPM(confFp string) (*ProxyManager, error) {
pm := ProxyManager{}
f, err := os.Open(confFp)
if err != nil {
return &ProxyManager{}, err
}
defer f.Close()
var cfg proxyConf
decoder := yaml.NewDecoder(f)
err = decoder.Decode(&cfg)
if err != nil {
return &ProxyManager{}, err
}
for _, pc := range cfg.Proxys {
var pi proxyInst
pi.URL = pc.URL
pi.Weight = pc.Weight
pm.Proxys = append(pm.Proxys, pi)
}
pm.applyWeight()
pm.LoadBalanceMode = cfg.LoadBalanceMode
pm.CacheCleanInterval = cfg.CacheCleanInterval
pm.DynamicWeightFile = cfg.DynamicWeightFile
pm.Cache = make(map[string]int)
go pm.keepClearingCache()
go pm.dynamicWeightLoader()
return &pm, nil
}
func (pm *ProxyManager) applyWeight() {
// update Chooser and enabledIndics
var chooseArr []wr.Choice
previousEnabledIndics := pm.enabledIndics
pm.enabledIndics = nil
for idx, p := range pm.Proxys {
if p.Weight > 0 {
pm.enabledIndics = append(pm.enabledIndics, idx)
chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)})
}
}
if len(pm.enabledIndics) == 0 {
fmt.Println("no enabled indice, proxy will be disabled.")
return
}
fmt.Println("enabled indices:", pm.enabledIndics)
pm.Chooser = wr.NewChooser(chooseArr...)
// update cache to remove disabled proxys
for addr, idx := range pm.Cache {
i := previousEnabledIndics[idx]
pos := sort.SearchInts(pm.enabledIndics, i)
if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i {
// found in current enabled list
pm.Cache[addr] = pos
} else {
// not found in current enabled list
pm.Cache[addr] = pm.Chooser.Pick().(int)
}
}
}
func (pm *ProxyManager) Get(addr string) (string, bool) {
pm.mux.Lock()
defer pm.mux.Unlock()
// fmt.Println(pm.Cache)
if len(pm.enabledIndics) == 0 {
// return error if no enabled proxy
return "", false
}
if pm.LoadBalanceMode == sticky {
idx, ok := pm.Cache[addr]
if ok {
// fmt.Println("match addr", addr, "using:", idx)
return pm.Proxys[pm.enabledIndics[idx]].URL, true
} else {
idx := pm.Chooser.Pick().(int)
pm.Cache[addr] = idx
return pm.Proxys[pm.enabledIndics[idx]].URL, true
}
} else if pm.LoadBalanceMode == cachedShuffle {
idx, ok := pm.Cache[addr]
if ok {
pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics))
return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true
} else {
idx := pm.Chooser.Pick().(int)
pm.Cache[addr] = idx
return pm.Proxys[pm.enabledIndics[idx]].URL, true
}
} else {
idx := pm.Chooser.Pick().(int)
return pm.Proxys[pm.enabledIndics[idx]].URL, true
}
}
func (pm *ProxyManager) ClearCache() {
// fmt.Println("clearing cache")
pm.mux.Lock()
pm.Cache = make(map[string]int)
pm.mux.Unlock()
// fmt.Println("after:", pm.Cache)
}
func (pm *ProxyManager) keepClearingCache() {
interval := pm.CacheCleanInterval
if interval <= 0 {
return
}
for {
time.Sleep(time.Duration(interval) * time.Second)
pm.ClearCache()
}
}
func (pm *ProxyManager) dynamicWeightLoader() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGTERM)
go func() {
for {
<-sigs
fmt.Println("Loading new weights")
file, err := ioutil.ReadFile(pm.DynamicWeightFile)
if err != nil {
fmt.Println("Error loading new weights, error:", err)
continue
}
dw := dynamicWeight{}
err = json.Unmarshal([]byte(file), &dw)
if err != nil {
fmt.Println("Error loading new weights, json parsing error:", err)
continue
}
// check if number of weights is the same as the number of proxys configured
if len(dw.Weights) != len(pm.Proxys) {
fmt.Println("Error loading new weights, unexpected number of weights")
continue
}
// apply the weights
pm.mux.Lock()
for idx, weight := range dw.Weights {
pm.Proxys[idx].Weight = weight
}
pm.applyWeight()
pm.mux.Unlock()
}
}()
}
func main() {
// testFn := func() bool {
// status := proxyTestStatusCode("socks5://127.0.0.1:1080", URL, 204)
// fmt.Println(status)
// return status
// }
// stop := make(chan bool)
// go testTask(testFn, 5, stop)
// time.Sleep(10 * time.Second)
// stop <- true
// time.Sleep(10 * time.Second)
configPath := flag.String("config", "", "Config file.")
bindAddr := flag.String("bind", "127.0.0.1:7000", "Bind address and port")
flag.Parse()
@ -280,18 +20,11 @@ func main() {
var pm *ProxyManager
pm, err := NewPM(*configPath)
fmt.Println("pm:", pm)
// fmt.Println("pm:", pm)
if err != nil {
return
}
fmt.Printf("Load balance mode: ")
if pm.LoadBalanceMode == sticky {
fmt.Printf("sticky with cache clearing interval: %ds.\n", pm.CacheCleanInterval)
} else if pm.LoadBalanceMode == cachedShuffle {
fmt.Println("cached shuffling.")
} else {
fmt.Println("randomize every connection.")
}
fmt.Printf("Load balance mode: %s.\n", pm.DescribeLoadBalanceMode())
conf := &Config{PM: pm}
server, err := New(conf)
if err != nil {

204
go-socks-lb/manager.go Normal file
View File

@ -0,0 +1,204 @@
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/signal"
"sort"
"sync"
"syscall"
"time"
wr "github.com/mroth/weightedrand"
"gopkg.in/yaml.v2"
)
const (
cachedShuffle = "cached-shuffle"
sticky = "sticky"
)
type proxyInst struct {
URL string
Weight int
StatusHistory []bool
}
type ProxyManager struct {
Proxys []proxyInst
Cache map[string]int
LoadBalanceMode string
CacheCleanInterval int
DynamicWeightFile string
Chooser wr.Chooser
enabledIndics []int
mux sync.Mutex
}
type proxyConf struct {
Proxys []struct {
URL string `yaml:"url"`
Weight int `yaml:"weight"`
} `yaml:"proxy"`
LoadBalanceMode string `yaml:"load-balance-mode"`
CacheCleanInterval int `yaml:"cache-clean-interval"`
DynamicWeightFile string `yaml:"dynamic-weight-file"`
}
type dynamicWeight struct {
Weights []int `json:"weights"`
}
func NewPM(confFp string) (*ProxyManager, error) {
pm := ProxyManager{}
f, err := os.Open(confFp)
if err != nil {
return &ProxyManager{}, err
}
defer f.Close()
var cfg proxyConf
decoder := yaml.NewDecoder(f)
err = decoder.Decode(&cfg)
if err != nil {
return &ProxyManager{}, err
}
for _, pc := range cfg.Proxys {
var pi proxyInst
pi.URL = pc.URL
pi.Weight = pc.Weight
pm.Proxys = append(pm.Proxys, pi)
}
pm.applyWeight()
pm.LoadBalanceMode = cfg.LoadBalanceMode
pm.CacheCleanInterval = cfg.CacheCleanInterval
pm.DynamicWeightFile = cfg.DynamicWeightFile
pm.Cache = make(map[string]int)
go pm.keepClearingCache()
go pm.dynamicWeightLoader()
return &pm, nil
}
func (pm *ProxyManager) DescribeLoadBalanceMode() string {
if pm.LoadBalanceMode == sticky {
return fmt.Sprintf("sticky with cache clearing interval: %ds", pm.CacheCleanInterval)
} else if pm.LoadBalanceMode == cachedShuffle {
return "cached shuffling"
} else {
return "randomize every connection"
}
}
func (pm *ProxyManager) applyWeight() {
// update Chooser and enabledIndics
var chooseArr []wr.Choice
previousEnabledIndics := pm.enabledIndics
pm.enabledIndics = nil
for idx, p := range pm.Proxys {
if p.Weight > 0 {
pm.enabledIndics = append(pm.enabledIndics, idx)
chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)})
}
}
if len(pm.enabledIndics) == 0 {
fmt.Println("no enabled indice, proxy will be disabled.")
return
}
fmt.Println("enabled indices:", pm.enabledIndics)
pm.Chooser = wr.NewChooser(chooseArr...)
// update cache to remove disabled proxys
for addr, idx := range pm.Cache {
i := previousEnabledIndics[idx]
pos := sort.SearchInts(pm.enabledIndics, i)
if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i {
// found in current enabled list
pm.Cache[addr] = pos
} else {
// not found in current enabled list
pm.Cache[addr] = pm.Chooser.Pick().(int)
}
}
}
func (pm *ProxyManager) Get(addr string) (string, bool) {
pm.mux.Lock()
defer pm.mux.Unlock()
// fmt.Println(pm.Cache)
if len(pm.enabledIndics) == 0 {
// return error if no enabled proxy
return "", false
}
if pm.LoadBalanceMode == sticky {
idx, ok := pm.Cache[addr]
if ok {
// fmt.Println("match addr", addr, "using:", idx)
return pm.Proxys[pm.enabledIndics[idx]].URL, true
} else {
idx := pm.Chooser.Pick().(int)
pm.Cache[addr] = idx
return pm.Proxys[pm.enabledIndics[idx]].URL, true
}
} else if pm.LoadBalanceMode == cachedShuffle {
idx, ok := pm.Cache[addr]
if ok {
pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics))
return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true
} else {
idx := pm.Chooser.Pick().(int)
pm.Cache[addr] = idx
return pm.Proxys[pm.enabledIndics[idx]].URL, true
}
} else {
idx := pm.Chooser.Pick().(int)
return pm.Proxys[pm.enabledIndics[idx]].URL, true
}
}
func (pm *ProxyManager) ClearCache() {
// fmt.Println("clearing cache")
pm.mux.Lock()
pm.Cache = make(map[string]int)
pm.mux.Unlock()
// fmt.Println("after:", pm.Cache)
}
func (pm *ProxyManager) keepClearingCache() {
interval := pm.CacheCleanInterval
if interval <= 0 {
return
}
for {
time.Sleep(time.Duration(interval) * time.Second)
pm.ClearCache()
}
}
func (pm *ProxyManager) dynamicWeightLoader() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGTERM)
go func() {
for {
<-sigs
fmt.Println("Loading new weights")
file, err := ioutil.ReadFile(pm.DynamicWeightFile)
if err != nil {
fmt.Println("Error loading new weights, error:", err)
continue
}
dw := dynamicWeight{}
err = json.Unmarshal([]byte(file), &dw)
if err != nil {
fmt.Println("Error loading new weights, json parsing error:", err)
continue
}
// check if number of weights is the same as the number of proxys configured
if len(dw.Weights) != len(pm.Proxys) {
fmt.Println("Error loading new weights, unexpected number of weights")
continue
}
// apply the weights
pm.mux.Lock()
for idx, weight := range dw.Weights {
pm.Proxys[idx].Weight = weight
}
pm.applyWeight()
pm.mux.Unlock()
}
}()
}

View File

@ -0,0 +1,81 @@
package main
import (
"fmt"
"net/http"
"net/url"
"os"
"time"
"golang.org/x/net/proxy"
)
const URL = "http://connectivitycheck.gstatic.com/generate_204"
func proxyTestStatusCode(proxyURL string, URL string, StatusCode int) bool {
// create a socks5 dialer
u, err := url.Parse(proxyURL)
if err != nil {
fmt.Println("error parsing")
return false
}
dialer, err := proxy.FromURL(u, proxy.Direct)
if err != nil {
fmt.Fprintln(os.Stderr, "can't connect to the proxy:", err)
return false
}
// setup a http client
httpTransport := &http.Transport{}
httpClient := &http.Client{
Transport: httpTransport,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
fmt.Printf("redirect %v", *req)
return http.ErrUseLastResponse
},
}
// set our socks5 as the dialer
httpTransport.Dial = dialer.Dial
// create a request
req, err := http.NewRequest("GET", URL, nil)
if err != nil {
fmt.Fprintln(os.Stderr, "can't create request:", err)
return false
}
// use the http client to fetch the page
resp, err := httpClient.Do(req)
if err != nil {
fmt.Fprintln(os.Stderr, "can't GET page:", err)
return false
}
// defer resp.Body.Close()
// b, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// fmt.Fprintln(os.Stderr, "error reading body:", err)
// return false
// }
return resp.StatusCode == StatusCode
}
// testFn := func() bool {
// status := proxyTestStatusCode("socks5://127.0.0.1:1080", URL, 204)
// fmt.Println(status)
// return status
// }
// stop := make(chan bool)
// go testTask(testFn, 5, stop)
// time.Sleep(10 * time.Second)
// stop <- true
// time.Sleep(10 * time.Second)
func testTask(testfn func() bool, interval int, stop chan bool) {
for {
go testfn()
select {
case <-stop:
fmt.Println("stopping")
return
default:
}
time.Sleep(time.Duration(interval) * time.Second)
}
}