Skip to content

Commit

Permalink
add holdConnection method
Browse files Browse the repository at this point in the history
  • Loading branch information
zoujiaqing committed Sep 10, 2024
1 parent 002741f commit 0c99c62
Showing 1 changed file with 62 additions and 13 deletions.
75 changes: 62 additions & 13 deletions Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Client struct {
reconnectCh chan struct{}
reconnectErr chan error
pingInterval time.Duration
pingTimer *time.Timer
pingTicker *time.Ticker
pingValue int
timeout uint // 超时时间(秒)
localIP string // 本地网卡 IP
Expand All @@ -38,7 +38,7 @@ func NewClient(addr string, port uint) *Client {
client := &Client{
addr: addr,
port: port,
RetryDelay: 1, // 如果断联1秒后重试
RetryDelay: 3, // 如果断联1秒后重试
reconnectCh: make(chan struct{}),
reconnectErr: make(chan error),
pingInterval: 1 * time.Second, // 默认每 1 秒发送一次 Ping 请求
Expand All @@ -51,9 +51,10 @@ func NewClient(addr string, port uint) *Client {
}

func (c *Client) stopPing() {
log.Printf("Stop timer for ping.")
c.pingValue = -1
c.pingTimer.Stop()
if c.pingTicker != nil {
log.Printf("Stop timer for ping.")
c.pingTicker.Stop() // 停止Ticker
}
}

func (c *Client) SetTimeout(timeout uint) {
Expand Down Expand Up @@ -182,21 +183,37 @@ func (c *Client) GetPingValue() int {
return c.pingValue
}

var pingTimerStarted bool = false

func (c *Client) startPingTimer() {
if pingTimerStarted {
log.Println("Ping 计时器已经运行,避免重复启动")
return
}

pingTimerStarted = true
log.Printf("Start timer for ping.")
c.pingTimer = time.NewTimer(c.pingInterval)
ticker := time.NewTicker(c.pingInterval)
c.pingTicker = ticker
go func() {
for {
select {
case <-c.pingTimer.C:
case <-ticker.C:
c.pingValue = -1
if !c.connected {
log.Printf("连接已关闭,停止PING")
return
}
log.Printf("持续PING")
c.ping()
c.pingTimer.Reset(c.pingInterval) // 重新设置定时器
case <-c.reconnectCh:
// 停止定时器
c.stopPing()
c.pingValue = -1
log.Printf("停止定时器")
ticker.Stop() // 停止Ticker
return
}
}
pingTimerStarted = false
}()
}

Expand All @@ -205,10 +222,11 @@ func (c *Client) ping() {
if err != nil {
log.Printf("Ping error: %v", err)
c.pingValue = -1
// 处理 Ping 错误,可以重连或者其他处理
} else {
c.pingValue = pingValue
return
}

c.pingValue = pingValue
log.Printf("Ping: %d", c.pingValue)
}

func (c *Client) handleMessage(conn *RpcConnection, data []byte) []byte {
Expand All @@ -218,6 +236,30 @@ func (c *Client) handleMessage(conn *RpcConnection, data []byte) []byte {
return nil
}

func (c *Client) holdReconnection() {
attempt := 0
for {
err := c.Connect()
if err == nil {
log.Printf("重连成功")
// 重连成功,重置 attempt
attempt = 0
return
}

attempt++
log.Printf("重连失败,继续尝试,第 %d 次", attempt)

// 如果连续失败 3 次,延长重连间隔
if attempt >= 3 {
log.Printf("多次重连失败,等待更长时间再尝试")
time.Sleep(c.RetryDelay * 5)
} else {
time.Sleep(c.RetryDelay)
}
}
}

func (c *Client) handleConnectionClosed(conn *RpcConnection) {
select {
case <-c.reconnectCh:
Expand All @@ -232,6 +274,13 @@ func (c *Client) handleConnectionClosed(conn *RpcConnection) {

// 停止定时器
c.stopPing()
c.pingValue = -1
pingTimerStarted = false // 连接丢失时重置

// 保持重连
go func() {
c.holdReconnection()
}()
}
}

Expand Down

0 comments on commit 0c99c62

Please sign in to comment.