1、优化websocket连接

This commit is contained in:
2025-09-19 15:42:51 +08:00
parent 7adba048c8
commit 556a32cb7c
4 changed files with 331 additions and 103 deletions

View File

@ -86,6 +86,10 @@ func run() error {
clearLogJob(db, ctx) clearLogJob(db, ctx)
}) })
//自动重启websocket
utility.SafeGo(func() {
reconnect(ctx)
})
// 等待中断信号以优雅地关闭服务器(设置 5 秒的超时时间) // 等待中断信号以优雅地关闭服务器(设置 5 秒的超时时间)
quit := make(chan os.Signal, 1) quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt) signal.Notify(quit, os.Interrupt)
@ -133,3 +137,18 @@ func clearLogJob(db *gorm.DB, ctx context.Context) {
fileservice.ClearLogs(db) fileservice.ClearLogs(db)
} }
} }
// 定时重连websocket
func reconnect(ctx context.Context) error {
ticker := time.NewTicker(time.Hour * 1)
defer ticker.Stop()
select {
case <-ctx.Done():
return nil
case <-ticker.C:
serverinit.RestartConnect()
}
return nil
}

View File

@ -84,3 +84,44 @@ func UserSubscribeInit(orm *gorm.DB, ctx context.Context) {
} }
}) })
} }
// 重启连接
func RestartConnect() error {
spotSockets := excservice.SpotSockets
futuresSockets := excservice.FutureSockets
timeOut := 22 * time.Hour
for _, item := range spotSockets {
//超过22小时重新连接
if time.Since(item.ConnectTime) > timeOut {
if err := item.ReplaceConnection(); err != nil {
log.Errorf("现货重启连接失败 key:%s,error:%s", item.GetKey(), err)
}
}
}
for _, item := range futuresSockets {
//超过22小时重新连接
if time.Since(item.ConnectTime) > timeOut {
if err := item.ReplaceConnection(); err != nil {
log.Errorf("合约重启连接失败 key:%s,error:%s", item.GetKey(), err)
}
}
}
return nil
}
// 假死重启
func DeadCheck() {
spotSockets := excservice.SpotSockets
futuresSockets := excservice.FutureSockets
for _, item := range spotSockets {
item.DeadCheck()
}
for _, item := range futuresSockets {
item.DeadCheck()
}
}

View File

@ -15,7 +15,7 @@ import (
- @msg 消息内容 - @msg 消息内容
- @listenType 订阅类型 0-现货 1-合约 - @listenType 订阅类型 0-现货 1-合约
*/ */
func ReceiveListen(msg []byte, listenType int) (reconnect bool, err error) { func ReceiveListen(msg []byte, listenType int, apiKey string) (reconnect bool, err error) {
var dataMap map[string]interface{} var dataMap map[string]interface{}
err = sonic.Unmarshal(msg, &dataMap) err = sonic.Unmarshal(msg, &dataMap)

View File

@ -20,6 +20,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/bytedance/sonic" "github.com/bytedance/sonic"
@ -42,7 +43,9 @@ type BinanceWebSocketManager struct {
isStopped bool // 标记 WebSocket 是否已主动停止 isStopped bool // 标记 WebSocket 是否已主动停止
mu sync.Mutex // 用于控制并发访问 isStopped mu sync.Mutex // 用于控制并发访问 isStopped
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
listenKey string // 新增字段 listenKey string // 新增字段
reconnecting atomic.Bool // 防止重复重连
ConnectTime time.Time // 当前连接建立时间
} }
// 已有连接 // 已有连接
@ -72,6 +75,10 @@ func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyA
} }
} }
func (wm *BinanceWebSocketManager) GetKey() string {
return wm.apiKey
}
func (wm *BinanceWebSocketManager) Start() { func (wm *BinanceWebSocketManager) Start() {
utility.SafeGo(wm.run) utility.SafeGo(wm.run)
// wm.run() // wm.run()
@ -91,14 +98,33 @@ func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAd
wm.isStopped = false wm.isStopped = false
utility.SafeGo(wm.run) utility.SafeGo(wm.run)
} else { } else {
wm.reconnect <- struct{}{} log.Warnf("调用restart")
wm.triggerReconnect(true)
} }
return wm return wm
} }
// 触发重连
func (wm *BinanceWebSocketManager) triggerReconnect(force bool) {
if force {
wm.reconnecting.Store(false) // 强制重置标志位
}
if wm.reconnecting.CompareAndSwap(false, true) {
log.Warnf("准备重连 key: %s wsType: %v", wm.apiKey, wm.wsType)
// 发送信号触发重连协程
select {
case wm.reconnect <- struct{}{}:
default:
// 防止阻塞,如果通道满了就跳过
log.Debugf("reconnect 信号已存在,跳过 key:%s", wm.apiKey)
}
}
}
func Restart(wm *BinanceWebSocketManager) { func Restart(wm *BinanceWebSocketManager) {
wm.reconnect <- struct{}{} log.Warnf("调用restart")
wm.triggerReconnect(true)
} }
func (wm *BinanceWebSocketManager) run() { func (wm *BinanceWebSocketManager) run() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -201,6 +227,8 @@ func (wm *BinanceWebSocketManager) connect(ctx context.Context) error {
return err return err
} }
// 连接成功,更新连接时间
wm.ConnectTime = time.Now()
log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey)) log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey))
// Ping处理 // Ping处理
@ -222,10 +250,88 @@ func (wm *BinanceWebSocketManager) connect(ctx context.Context) error {
return nil return nil
}) })
// utility.SafeGoParam(wm.restartConnect, ctx)
utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) }) utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) })
utility.SafeGo(func() { wm.readMessages(ctx) }) utility.SafeGo(func() { wm.readMessages(ctx) })
utility.SafeGo(func() { wm.handleReconnect(ctx) }) utility.SafeGo(func() { wm.handleReconnect(ctx) })
utility.SafeGo(func() { wm.startPingLoop(ctx) })
// utility.SafeGo(func() { wm.startDeadCheck(ctx) })
return nil
}
// ReplaceConnection 创建新连接并关闭旧连接,实现无缝连接替换
func (wm *BinanceWebSocketManager) ReplaceConnection() error {
wm.mu.Lock()
if wm.isStopped {
wm.mu.Unlock()
return errors.New("WebSocket 已停止")
}
oldCtxCancel := wm.cancelFunc
oldConn := wm.ws
wm.mu.Unlock()
log.Infof("🔄 正在替换连接: %s", wm.apiKey)
// 步骤 1先获取新的 listenKey 和连接
newListenKey, err := wm.getListenKey()
if err != nil {
return fmt.Errorf("获取新 listenKey 失败: %w", err)
}
dialer, err := wm.getDialer()
if err != nil {
return err
}
newURL := fmt.Sprintf("%s/%s", wm.url, newListenKey)
newConn, _, err := dialer.Dial(newURL, nil)
if err != nil {
return fmt.Errorf("新连接 Dial 失败: %w", err)
}
// 步骤 2创建新的上下文并启动协程
newCtx, newCancel := context.WithCancel(context.Background())
// 设置 ping handler
newConn.SetPingHandler(func(appData string) error {
log.Infof("收到 Ping新连接 key:%s msg:%s", wm.apiKey, appData)
for x := 0; x < 5; x++ {
if err := newConn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(10*time.Second)); err != nil {
log.Errorf("Pong 失败 %d 次 err:%v", x, err)
time.Sleep(time.Second)
continue
}
break
}
setLastTime(wm)
return nil
})
// 步骤 3安全切换连接
wm.mu.Lock()
wm.ws = newConn
wm.listenKey = newListenKey
wm.ConnectTime = time.Now()
wm.cancelFunc = newCancel
wm.mu.Unlock()
log.Infof("✅ 替换连接成功: %s listenKey: %s", wm.apiKey, newListenKey)
// 步骤 4启动新连接协程
go wm.startListenKeyRenewal2(newCtx)
go wm.readMessages(newCtx)
go wm.handleReconnect(newCtx)
go wm.startPingLoop(newCtx)
// go wm.startDeadCheck(newCtx)
// 步骤 5关闭旧连接、取消旧协程
if oldCtxCancel != nil {
oldCtxCancel()
}
if oldConn != nil {
_ = oldConn.Close()
log.Infof("🔒 旧连接已关闭: %s", wm.apiKey)
}
return nil return nil
} }
@ -251,6 +357,7 @@ func setLastTime(wm *BinanceWebSocketManager) {
if val != "" { if val != "" {
helper.DefaultRedis.SetString(subKey, val) helper.DefaultRedis.SetString(subKey, val)
} }
} }
func (wm *BinanceWebSocketManager) getDialer() (*websocket.Dialer, error) { func (wm *BinanceWebSocketManager) getDialer() (*websocket.Dialer, error) {
@ -357,7 +464,8 @@ func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) {
_, msg, err := wm.ws.ReadMessage() _, msg, err := wm.ws.ReadMessage()
if err != nil && strings.Contains(err.Error(), "websocket: close") { if err != nil && strings.Contains(err.Error(), "websocket: close") {
if !wm.isStopped { if !wm.isStopped {
wm.reconnect <- struct{}{} log.Error("收到关闭消息", err.Error())
wm.triggerReconnect(false)
} }
log.Error("websocket 关闭") log.Error("websocket 关闭")
@ -375,11 +483,13 @@ func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) {
func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) { func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) {
setLastTime(wm) setLastTime(wm)
if reconnect, _ := ReceiveListen(msg, wm.wsType); reconnect { if reconnect, _ := ReceiveListen(msg, wm.wsType, wm.apiKey); reconnect {
wm.reconnect <- struct{}{} log.Errorf("收到重连请求")
wm.triggerReconnect(false)
} }
} }
// Stop 安全停止 WebSocket
func (wm *BinanceWebSocketManager) Stop() { func (wm *BinanceWebSocketManager) Stop() {
wm.mu.Lock() wm.mu.Lock()
defer wm.mu.Unlock() defer wm.mu.Unlock()
@ -387,9 +497,8 @@ func (wm *BinanceWebSocketManager) Stop() {
if wm.isStopped { if wm.isStopped {
return return
} }
wm.isStopped = true wm.isStopped = true
// 关闭 stopChannel确保已经关闭避免 panic
select { select {
case <-wm.stopChannel: case <-wm.stopChannel:
default: default:
@ -398,108 +507,182 @@ func (wm *BinanceWebSocketManager) Stop() {
if wm.cancelFunc != nil { if wm.cancelFunc != nil {
wm.cancelFunc() wm.cancelFunc()
wm.cancelFunc = nil
} }
if wm.ws != nil { if wm.ws != nil {
if err := wm.ws.Close(); err != nil { if err := wm.ws.Close(); err != nil {
log.Error(fmt.Sprintf("key【%s】close失败", wm.apiKey), err) log.Errorf("WebSocket Close 错误 key:%s err:%v", wm.apiKey, err)
} else {
log.Info(fmt.Sprintf("key【%s】close", wm.apiKey))
} }
wm.ws = nil
} }
// **重新创建 stopChannel避免 Restart() 时无效** log.Infof("WebSocket 已完全停止 key:%s", wm.apiKey)
wm.stopChannel = make(chan struct{}) wm.stopChannel = make(chan struct{}, 10)
} }
// 重连机制 // handleReconnect 使用指数退避并保持永不退出
func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) { func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) {
maxRetries := 100 // 最大重试次数 const maxRetries = 100
baseDelay := time.Second * 2
retryCount := 0 retryCount := 0
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Infof("handleReconnect context done: %s", wm.apiKey)
return return
case <-wm.reconnect: case <-wm.reconnect:
wm.mu.Lock()
if wm.isStopped { if wm.isStopped {
wm.mu.Unlock()
return return
} }
wm.mu.Unlock()
log.Warn("WebSocket 连接断开,尝试重连...") log.Warnf("WebSocket 连接断开,准备重连 key:%s", wm.apiKey)
if wm.ws != nil {
wm.ws.Close()
}
// 取消旧的上下文
if wm.cancelFunc != nil {
wm.cancelFunc()
}
for { for {
wm.mu.Lock()
if wm.ws != nil {
_ = wm.ws.Close()
wm.ws = nil
}
if wm.cancelFunc != nil {
wm.cancelFunc()
wm.cancelFunc = nil
}
wm.mu.Unlock()
newCtx, cancel := context.WithCancel(context.Background()) newCtx, cancel := context.WithCancel(context.Background())
wm.cancelFunc = cancel // 更新 cancelFunc wm.mu.Lock()
wm.cancelFunc = cancel
wm.mu.Unlock()
if err := wm.connect(newCtx); err != nil { if err := wm.connect(newCtx); err != nil {
log.Errorf("重连失败: %v", err) log.Errorf("🔌 重连失败%d/%dkey:%serr: %v", retryCount+1, maxRetries, wm.apiKey, err)
cancel() cancel()
retryCount++ retryCount++
if retryCount >= maxRetries { if retryCount >= maxRetries {
log.Error("重连失败次数过多,退出重连逻辑") log.Errorf("重连失败次数过多,停止重连逻辑 key:%s", wm.apiKey)
wm.reconnecting.Store(false)
return return
} }
time.Sleep(5 * time.Second) delay := baseDelay * time.Duration(1<<retryCount)
if delay > time.Minute*5 {
delay = time.Minute * 5
}
log.Warnf("等待 %v 后重试...", delay)
time.Sleep(delay)
continue continue
} }
log.Infof("✅ 重连成功 key:%s", wm.apiKey)
retryCount = 0 retryCount = 0
wm.reconnecting.Store(false)
// ✅ 重连成功后开启假死检测
utility.SafeGo(func() { wm.startDeadCheck(newCtx) })
break
}
}
}
}
// startDeadCheck 替代 Start 中的定时器,绑定连接生命周期
func (wm *BinanceWebSocketManager) startDeadCheck(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if wm.isStopped {
return return
} }
wm.DeadCheck()
}
}
}
// 假死检测
func (wm *BinanceWebSocketManager) DeadCheck() {
subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey)
val, _ := helper.DefaultRedis.GetString(subKey)
if val == "" {
log.Warnf("没有订阅信息,无法进行假死检测")
return
}
var data binancedto.UserSubscribeState
_ = sonic.Unmarshal([]byte(val), &data)
var lastTime *time.Time
if wm.wsType == 0 {
lastTime = data.SpotLastTime
} else {
lastTime = data.FuturesLastTime
}
// 定义最大静默时间(超出视为假死)
var timeout time.Duration
if wm.wsType == 0 {
timeout = 40 * time.Second // Spot 每 20s ping40s 足够
} else {
timeout = 6 * time.Minute // Futures 每 3 分钟 ping
}
if lastTime != nil && time.Since(*lastTime) > timeout {
log.Warnf("检测到假死连接 key:%s type:%v, 距离上次通信: %v, 触发重连", wm.apiKey, wm.wsType, time.Since(*lastTime))
wm.triggerReconnect(true)
}
}
// 主动心跳发送机制
func (wm *BinanceWebSocketManager) startPingLoop(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if wm.isStopped {
return
}
err := wm.ws.WriteMessage(websocket.PingMessage, []byte("ping"))
if err != nil {
log.Error("主动 Ping Binance 失败:", err)
wm.triggerReconnect(false)
}
} }
} }
} }
// 定期删除listenkey 并重启ws // 定期删除listenkey 并重启ws
func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) { // func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) {
time.Sleep(30 * time.Minute) // time.Sleep(30 * time.Minute)
select { // select {
case <-ctx.Done(): // case <-ctx.Done():
return // return
default: // default:
if err := wm.deleteListenKey(listenKey); err != nil { // if err := wm.deleteListenKey(listenKey); err != nil {
log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err) // log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
} else { // } else {
log.Debug("Successfully delete listenKey") // log.Debug("Successfully delete listenKey")
wm.reconnect <- struct{}{} // wm.triggerReconnect()
} // }
} // }
// ticker := time.NewTicker(5 * time.Minute) // }
// defer ticker.Stop()
// for {
// select {
// case <-ticker.C:
// if wm.isStopped {
// return
// }
// if err := wm.deleteListenKey(listenKey); err != nil {
// log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
// } else {
// log.Debug("Successfully delete listenKey")
// wm.reconnect <- struct{}{}
// return
// }
// case <-ctx.Done():
// return
// }
// }
}
// 定时续期 // 定时续期
func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) { func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) {
@ -528,49 +711,34 @@ func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) {
/* /*
删除listenkey 删除listenkey
*/ */
func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error { // func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error {
client, err := wm.createBinanceClient() // client, err := wm.createBinanceClient()
if err != nil { // if err != nil {
return err // return err
} // }
var resp []byte // var resp []byte
switch wm.wsType { // switch wm.wsType {
case 0: // case 0:
path := fmt.Sprintf("/api/v3/userDataStream") // path := fmt.Sprintf("/api/v3/userDataStream")
params := map[string]interface{}{ // params := map[string]interface{}{
"listenKey": listenKey, // "listenKey": listenKey,
} // }
resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params) // resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params)
log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) // log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
case 1: // case 1:
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil) // resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil)
log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) // log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
default: // default:
return errors.New("unknown ws type") // return errors.New("unknown ws type")
} // }
return err // return err
} // }
func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error { func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error {
// payloadParam := map[string]interface{}{
// "listenKey": listenKey,
// "apiKey": wm.apiKey,
// }
// params := map[string]interface{}{
// "id": getUUID(),
// "method": "userDataStream.ping",
// "params": payloadParam,
// }
// if err := wm.ws.WriteJSON(params); err != nil {
// return err
// }
// wm.ws.WriteJSON()
client, err := wm.createBinanceClient() client, err := wm.createBinanceClient()
if err != nil { if err != nil {
return err return err