Skip to content

Commit

Permalink
Merge pull request #2 from piaodazhu/dev
Browse files Browse the repository at this point in the history
debug & unit tests
  • Loading branch information
piaodazhu authored Aug 17, 2023
2 parents 9f9f6e2 + fd5d40f commit 8619b53
Show file tree
Hide file tree
Showing 12 changed files with 639 additions and 51 deletions.
2 changes: 1 addition & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

那为什么不编写一个包来使这变得更加优雅呢?proxylite 应运而生。它的主要特点如下:
1. 易于集成到代码中。提供了服务器和客户端结构。只需导入此包,然后在需要时注册隧道。
2. 动态按需反向代理**一次注册,一个端口,一个用户,一个 TCP 连接。**
2. 动态按需反向代理,带有在线用户数量控制。
3. 服务注册和发现。
4. 支持自定义钩子。(开发中)

Expand Down
6 changes: 4 additions & 2 deletions TODO
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
- [ ] Hook API
- [ ] Unit Tests
- [x] Unit Tests
- [ ] Code Report
- [ ] Benchmark
- [ ] Benchmark
- [x] Register arguments: maxConn, maxServeNum, maxServeTime
- [ ] Active user status
54 changes: 28 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
type RegisterEntry struct {
// Basic Info
Info RegisterInfo
// Control Info
Ctrl ControlInfo
// Cancel function
Cancel func()
// Done channel
Expand All @@ -26,8 +28,8 @@ type ProxyLiteClient struct {
ready bool
serverAddr string
lock sync.RWMutex
avaliablePorts map[int]struct{}
registered map[int]*RegisterEntry
avaliablePorts map[uint32]struct{}
registered map[uint32]*RegisterEntry
logger *log.Logger
}

Expand All @@ -36,11 +38,11 @@ func NewProxyLiteClient(serverAddr string) *ProxyLiteClient {
client := &ProxyLiteClient{
ready: false,
serverAddr: serverAddr,
avaliablePorts: map[int]struct{}{},
registered: map[int]*RegisterEntry{},
avaliablePorts: map[uint32]struct{}{},
registered: map[uint32]*RegisterEntry{},
logger: log.New(),
}
client.logger.SetReportCaller(true)
// client.logger.SetReportCaller(true)

ports, err := AskFreePort(serverAddr)
if err != nil {
Expand All @@ -54,7 +56,7 @@ func NewProxyLiteClient(serverAddr string) *ProxyLiteClient {
}

// AvaliablePorts Get avaliable ports from proxy server.
func (c *ProxyLiteClient) AvaliablePorts() ([]int, bool) {
func (c *ProxyLiteClient) AvaliablePorts() ([]uint32, bool) {
ports, err := AskFreePort(c.serverAddr)
if err != nil {
c.ready = false
Expand All @@ -68,7 +70,7 @@ func (c *ProxyLiteClient) AvaliablePorts() ([]int, bool) {
}

// AnyPort Get a random avaliable port from proxy server.
func (c *ProxyLiteClient) AnyPort() (int, bool) {
func (c *ProxyLiteClient) AnyPort() (uint32, bool) {
if c.ready {
for port := range c.avaliablePorts {
return port, true
Expand All @@ -92,12 +94,10 @@ func (c *ProxyLiteClient) ActiveServices() ([]ServiceInfo, error) {
return DiscoverServices(c.serverAddr)
}

func register(conn net.Conn, info RegisterInfo) error {
func register(conn net.Conn, info RegisterInfo, ctrl ControlInfo) error {
req := RegisterServiceReq{
Info: RegisterInfo{
OuterPort: info.OuterPort,
InnerAddr: info.InnerAddr,
},
Info: info,
Ctrl: ctrl,
}
raw, _ := json.Marshal(req)
err := sendMessage(conn, TypeRegisterServiceReq, raw)
Expand Down Expand Up @@ -130,33 +130,33 @@ func (c *ProxyLiteClient) SetLogger(logger *log.Logger) {
}

// RegisterInnerService Register inner server to proxy server's outer port.
func (c *ProxyLiteClient) RegisterInnerService(info RegisterInfo) error {
func (c *ProxyLiteClient) RegisterInnerService(info RegisterInfo, ctrl ControlInfo) (func(), chan struct{}, error) {
if !c.ready {
return errors.New("client not ready")
return nil, nil, errors.New("client not ready")
}
c.lock.Lock()
defer c.lock.Unlock()
if _, exists := c.registered[info.OuterPort]; exists {
return errors.New("ports is occupied")
return nil, nil, errors.New("ports is occupied")
}

serverConn, err := net.Dial("tcp", c.serverAddr)
if err != nil {
c.ready = false
return err
return nil, nil, err
}

// Register the service must return nil.
err = register(serverConn, info)
err = register(serverConn, info, ctrl)
c.logTunnelMessage(info.Name, "REGISTER", fmt.Sprint("err=", err))
if err != nil {
serverConn.Close()
return err
return nil, nil, err
}

binder := newConnUidBinder(0)
doOnce := sync.Once{}
done := make(chan struct{})
done := make(chan struct{}, 1)

var totalOut, totalIn uint64
closeTunnel := func() {
Expand All @@ -174,6 +174,7 @@ func (c *ProxyLiteClient) RegisterInnerService(info RegisterInfo) error {
// Now we can add the new entry to registered table.
c.registered[info.OuterPort] = &RegisterEntry{
Info: info,
Ctrl: ctrl,
Cancel: cancelFunc,
Done: done,
}
Expand Down Expand Up @@ -220,19 +221,18 @@ func (c *ProxyLiteClient) RegisterInnerService(info RegisterInfo) error {
var data []byte
var uid uint32
var innerConn *net.Conn
var newConn net.Conn
var ok, alive bool
var err error
for {
mtype, data, err = recvMessageWithBuffer(serverConn, buf)
if err != nil || mtype != TypeDataSegment {
log.Error(err, mtype)
c.logger.Error(err, mtype)
break
}
uid, alive = readUidUnsafe(data)
if innerConn, ok = binder.getConn(uid); !ok {
// Blocking. can be optimized.
newConn, err = net.Dial("tcp", info.InnerAddr)
newConn, err := net.Dial("tcp", info.InnerAddr)
if err != nil {
// should close this client. leave it to below process.
readFromService(nil, uid)
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *ProxyLiteClient) RegisterInnerService(info RegisterInfo) error {
delete(c.registered, info.OuterPort)
c.lock.Unlock()
}()
return nil
return cancelFunc, done, nil
}

// GetRegisterEntryByName Get RegisterEntry
Expand All @@ -288,7 +288,7 @@ func (c *ProxyLiteClient) GetRegisterEntryByName(name string) (*RegisterEntry, b
func (c *ProxyLiteClient) GetRegisterEntryByPort(port int) (*RegisterEntry, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
entry, exists := c.registered[port]
entry, exists := c.registered[uint32(port)]
return entry, exists
}

Expand All @@ -297,7 +297,7 @@ func (c *ProxyLiteClient) logTunnelMessage(service, header, msg string) {
}

// AskFreePort Ask avaliable free port from proxy server with given address.
func AskFreePort(addr string) ([]int, error) {
func AskFreePort(addr string) ([]uint32, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
Expand All @@ -324,7 +324,9 @@ func AskFreePort(addr string) ([]int, error) {
return nil, err
}

sort.Ints(rsp.Ports)
sort.Slice(rsp.Ports, func(i, j int) bool {
return rsp.Ports[i] < rsp.Ports[j]
})
return rsp.Ports, nil
}

Expand Down
2 changes: 2 additions & 0 deletions example/client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
1 change: 1 addition & 0 deletions example/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func main() {
Name: "redis",
Message: "redis kv",
},
proxylite.ControlInfo{},
))

log.Print(client.AvaliablePorts())
Expand Down
2 changes: 2 additions & 0 deletions example/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/google/uuid v1.3.0
github.com/sirupsen/logrus v1.9.3
golang.org/x/sync v0.3.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
16 changes: 12 additions & 4 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,28 @@ type AskFreePortReq struct{}

// AskFreePortRsp Ask avaliable free ports response
type AskFreePortRsp struct {
Ports []int
Ports []uint32
}

// RegisterInfo Register information
type RegisterInfo struct {
OuterPort int
OuterPort uint32
InnerAddr string
Name string
Message string
}

// ControlInfo Register Controlling information
type ControlInfo struct {
MaxServeTime uint32
MaxServeConn uint32
MaxServeCount uint32
}

// RegisterServiceReq inner service registration request
type RegisterServiceReq struct {
Info RegisterInfo
Ctrl ControlInfo
}

const (
Expand All @@ -52,7 +60,7 @@ const (

// RegisterServiceRsp inner service registration response
type RegisterServiceRsp struct {
Code int
Code int32
}

// AskServiceReq Service discovery request
Expand All @@ -62,7 +70,7 @@ type AskServiceReq struct {

// ServiceInfo Service basic information
type ServiceInfo struct {
Port int
Port uint32
Name string
Message string
Busy bool
Expand Down
Loading

0 comments on commit 8619b53

Please sign in to comment.