diff --git a/cmd/usersubscribe/usersubscribe.go b/cmd/usersubscribe/usersubscribe.go index efb1da7..429c09a 100644 --- a/cmd/usersubscribe/usersubscribe.go +++ b/cmd/usersubscribe/usersubscribe.go @@ -86,6 +86,10 @@ func run() error { clearLogJob(db, ctx) }) + //自动重启websocket + utility.SafeGo(func() { + reconnect(ctx) + }) // 等待中断信号以优雅地关闭服务器(设置 5 秒的超时时间) quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) @@ -133,3 +137,18 @@ func clearLogJob(db *gorm.DB, ctx context.Context) { fileservice.ClearLogs(db) } } + +// 定时重连websocket +func reconnect(ctx context.Context) error { + ticker := time.NewTicker(time.Hour * 1) + defer ticker.Stop() + + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + serverinit.RestartConnect() + } + + return nil +} diff --git a/config/serverinit/usersubscribeinit.go b/config/serverinit/usersubscribeinit.go index 465e908..c58f569 100644 --- a/config/serverinit/usersubscribeinit.go +++ b/config/serverinit/usersubscribeinit.go @@ -84,3 +84,44 @@ func UserSubscribeInit(orm *gorm.DB, ctx context.Context) { } }) } + +// 重启连接 +func RestartConnect() error { + spotSockets := excservice.SpotSockets + futuresSockets := excservice.FutureSockets + timeOut := 22 * time.Hour + + for _, item := range spotSockets { + //超过22小时,重新连接 + if time.Since(item.ConnectTime) > timeOut { + if err := item.ReplaceConnection(); err != nil { + log.Errorf("现货重启连接失败 key:%s,error:%s", item.GetKey(), err) + } + } + } + + for _, item := range futuresSockets { + //超过22小时,重新连接 + if time.Since(item.ConnectTime) > timeOut { + if err := item.ReplaceConnection(); err != nil { + log.Errorf("合约重启连接失败 key:%s,error:%s", item.GetKey(), err) + } + } + } + + return nil +} + +// 假死重启 +func DeadCheck() { + spotSockets := excservice.SpotSockets + futuresSockets := excservice.FutureSockets + + for _, item := range spotSockets { + item.DeadCheck() + } + + for _, item := range futuresSockets { + item.DeadCheck() + } +} diff --git a/services/excservice/binancereceive.go b/services/excservice/binancereceive.go index 24d3cba..f2a5fb5 100644 --- a/services/excservice/binancereceive.go +++ b/services/excservice/binancereceive.go @@ -15,7 +15,7 @@ import ( - @msg 消息内容 - @listenType 订阅类型 0-现货 1-合约 */ -func ReceiveListen(msg []byte, listenType int) (reconnect bool, err error) { +func ReceiveListen(msg []byte, listenType int, apiKey string) (reconnect bool, err error) { var dataMap map[string]interface{} err = sonic.Unmarshal(msg, &dataMap) diff --git a/services/excservice/binancesocketmanager.go b/services/excservice/binancesocketmanager.go index 1059e08..139cdd3 100644 --- a/services/excservice/binancesocketmanager.go +++ b/services/excservice/binancesocketmanager.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/bytedance/sonic" @@ -42,7 +43,9 @@ type BinanceWebSocketManager struct { isStopped bool // 标记 WebSocket 是否已主动停止 mu sync.Mutex // 用于控制并发访问 isStopped cancelFunc context.CancelFunc - listenKey string // 新增字段 + listenKey string // 新增字段 + reconnecting atomic.Bool // 防止重复重连 + ConnectTime time.Time // 当前连接建立时间 } // 已有连接 @@ -72,6 +75,10 @@ func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyA } } +func (wm *BinanceWebSocketManager) GetKey() string { + return wm.apiKey +} + func (wm *BinanceWebSocketManager) Start() { utility.SafeGo(wm.run) // wm.run() @@ -91,14 +98,33 @@ func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAd wm.isStopped = false utility.SafeGo(wm.run) } else { - wm.reconnect <- struct{}{} + log.Warnf("调用restart") + wm.triggerReconnect(true) } return wm } +// 触发重连 +func (wm *BinanceWebSocketManager) triggerReconnect(force bool) { + if force { + wm.reconnecting.Store(false) // 强制重置标志位 + } + + if wm.reconnecting.CompareAndSwap(false, true) { + log.Warnf("准备重连 key: %s wsType: %v", wm.apiKey, wm.wsType) + // 发送信号触发重连协程 + select { + case wm.reconnect <- struct{}{}: + default: + // 防止阻塞,如果通道满了就跳过 + log.Debugf("reconnect 信号已存在,跳过 key:%s", wm.apiKey) + } + } +} func Restart(wm *BinanceWebSocketManager) { - wm.reconnect <- struct{}{} + log.Warnf("调用restart") + wm.triggerReconnect(true) } func (wm *BinanceWebSocketManager) run() { ctx, cancel := context.WithCancel(context.Background()) @@ -201,6 +227,8 @@ func (wm *BinanceWebSocketManager) connect(ctx context.Context) error { return err } + // 连接成功,更新连接时间 + wm.ConnectTime = time.Now() log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey)) // Ping处理 @@ -222,10 +250,88 @@ func (wm *BinanceWebSocketManager) connect(ctx context.Context) error { return nil }) - // utility.SafeGoParam(wm.restartConnect, ctx) utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) }) utility.SafeGo(func() { wm.readMessages(ctx) }) utility.SafeGo(func() { wm.handleReconnect(ctx) }) + utility.SafeGo(func() { wm.startPingLoop(ctx) }) + // utility.SafeGo(func() { wm.startDeadCheck(ctx) }) + + return nil +} + +// ReplaceConnection 创建新连接并关闭旧连接,实现无缝连接替换 +func (wm *BinanceWebSocketManager) ReplaceConnection() error { + wm.mu.Lock() + if wm.isStopped { + wm.mu.Unlock() + return errors.New("WebSocket 已停止") + } + oldCtxCancel := wm.cancelFunc + oldConn := wm.ws + wm.mu.Unlock() + + log.Infof("🔄 正在替换连接: %s", wm.apiKey) + + // 步骤 1:先获取新的 listenKey 和连接 + newListenKey, err := wm.getListenKey() + if err != nil { + return fmt.Errorf("获取新 listenKey 失败: %w", err) + } + + dialer, err := wm.getDialer() + if err != nil { + return err + } + + newURL := fmt.Sprintf("%s/%s", wm.url, newListenKey) + newConn, _, err := dialer.Dial(newURL, nil) + if err != nil { + return fmt.Errorf("新连接 Dial 失败: %w", err) + } + + // 步骤 2:创建新的上下文并启动协程 + newCtx, newCancel := context.WithCancel(context.Background()) + + // 设置 ping handler + newConn.SetPingHandler(func(appData string) error { + log.Infof("收到 Ping(新连接) key:%s msg:%s", wm.apiKey, appData) + for x := 0; x < 5; x++ { + if err := newConn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(10*time.Second)); err != nil { + log.Errorf("Pong 失败 %d 次 err:%v", x, err) + time.Sleep(time.Second) + continue + } + break + } + setLastTime(wm) + return nil + }) + + // 步骤 3:安全切换连接 + wm.mu.Lock() + wm.ws = newConn + wm.listenKey = newListenKey + wm.ConnectTime = time.Now() + wm.cancelFunc = newCancel + wm.mu.Unlock() + + log.Infof("✅ 替换连接成功: %s listenKey: %s", wm.apiKey, newListenKey) + + // 步骤 4:启动新连接协程 + go wm.startListenKeyRenewal2(newCtx) + go wm.readMessages(newCtx) + go wm.handleReconnect(newCtx) + go wm.startPingLoop(newCtx) + // go wm.startDeadCheck(newCtx) + + // 步骤 5:关闭旧连接、取消旧协程 + if oldCtxCancel != nil { + oldCtxCancel() + } + if oldConn != nil { + _ = oldConn.Close() + log.Infof("🔒 旧连接已关闭: %s", wm.apiKey) + } return nil } @@ -251,6 +357,7 @@ func setLastTime(wm *BinanceWebSocketManager) { if val != "" { helper.DefaultRedis.SetString(subKey, val) } + } func (wm *BinanceWebSocketManager) getDialer() (*websocket.Dialer, error) { @@ -357,7 +464,8 @@ func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) { _, msg, err := wm.ws.ReadMessage() if err != nil && strings.Contains(err.Error(), "websocket: close") { if !wm.isStopped { - wm.reconnect <- struct{}{} + log.Error("收到关闭消息", err.Error()) + wm.triggerReconnect(false) } log.Error("websocket 关闭") @@ -375,11 +483,13 @@ func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) { func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) { setLastTime(wm) - if reconnect, _ := ReceiveListen(msg, wm.wsType); reconnect { - wm.reconnect <- struct{}{} + if reconnect, _ := ReceiveListen(msg, wm.wsType, wm.apiKey); reconnect { + log.Errorf("收到重连请求") + wm.triggerReconnect(false) } } +// Stop 安全停止 WebSocket func (wm *BinanceWebSocketManager) Stop() { wm.mu.Lock() defer wm.mu.Unlock() @@ -387,9 +497,8 @@ func (wm *BinanceWebSocketManager) Stop() { if wm.isStopped { return } - wm.isStopped = true - // 关闭 stopChannel(确保已经关闭,避免 panic) + select { case <-wm.stopChannel: default: @@ -398,108 +507,182 @@ func (wm *BinanceWebSocketManager) Stop() { if wm.cancelFunc != nil { wm.cancelFunc() + wm.cancelFunc = nil } if wm.ws != nil { if err := wm.ws.Close(); err != nil { - log.Error(fmt.Sprintf("key【%s】close失败", wm.apiKey), err) - } else { - log.Info(fmt.Sprintf("key【%s】close", wm.apiKey)) + log.Errorf("WebSocket Close 错误 key:%s err:%v", wm.apiKey, err) } + wm.ws = nil } - // **重新创建 stopChannel,避免 Restart() 时无效** - wm.stopChannel = make(chan struct{}) + log.Infof("WebSocket 已完全停止 key:%s", wm.apiKey) + wm.stopChannel = make(chan struct{}, 10) } -// 重连机制 +// handleReconnect 使用指数退避并保持永不退出 func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) { - maxRetries := 100 // 最大重试次数 + const maxRetries = 100 + baseDelay := time.Second * 2 retryCount := 0 for { select { case <-ctx.Done(): + log.Infof("handleReconnect context done: %s", wm.apiKey) return + case <-wm.reconnect: + wm.mu.Lock() if wm.isStopped { + wm.mu.Unlock() return } + wm.mu.Unlock() - log.Warn("WebSocket 连接断开,尝试重连...") - - if wm.ws != nil { - wm.ws.Close() - } - - // 取消旧的上下文 - if wm.cancelFunc != nil { - wm.cancelFunc() - } + log.Warnf("WebSocket 连接断开,准备重连 key:%s", wm.apiKey) for { + wm.mu.Lock() + if wm.ws != nil { + _ = wm.ws.Close() + wm.ws = nil + } + if wm.cancelFunc != nil { + wm.cancelFunc() + wm.cancelFunc = nil + } + wm.mu.Unlock() + newCtx, cancel := context.WithCancel(context.Background()) - wm.cancelFunc = cancel // 更新 cancelFunc + wm.mu.Lock() + wm.cancelFunc = cancel + wm.mu.Unlock() if err := wm.connect(newCtx); err != nil { - log.Errorf("重连失败: %v", err) + log.Errorf("🔌 重连失败(%d/%d)key:%s,err: %v", retryCount+1, maxRetries, wm.apiKey, err) cancel() retryCount++ if retryCount >= maxRetries { - log.Error("重连失败次数过多,退出重连逻辑") + log.Errorf("❌ 重连失败次数过多,停止重连逻辑 key:%s", wm.apiKey) + wm.reconnecting.Store(false) return } - time.Sleep(5 * time.Second) + delay := baseDelay * time.Duration(1< time.Minute*5 { + delay = time.Minute * 5 + } + log.Warnf("等待 %v 后重试...", delay) + time.Sleep(delay) continue } + log.Infof("✅ 重连成功 key:%s", wm.apiKey) retryCount = 0 + wm.reconnecting.Store(false) + + // ✅ 重连成功后开启假死检测 + utility.SafeGo(func() { wm.startDeadCheck(newCtx) }) + + break + } + } + } +} + +// startDeadCheck 替代 Start 中的定时器,绑定连接生命周期 +func (wm *BinanceWebSocketManager) startDeadCheck(ctx context.Context) { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if wm.isStopped { return } + wm.DeadCheck() + } + } +} + +// 假死检测 +func (wm *BinanceWebSocketManager) DeadCheck() { + subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey) + val, _ := helper.DefaultRedis.GetString(subKey) + if val == "" { + log.Warnf("没有订阅信息,无法进行假死检测") + return + } + + var data binancedto.UserSubscribeState + _ = sonic.Unmarshal([]byte(val), &data) + + var lastTime *time.Time + if wm.wsType == 0 { + lastTime = data.SpotLastTime + } else { + lastTime = data.FuturesLastTime + } + + // 定义最大静默时间(超出视为假死) + var timeout time.Duration + if wm.wsType == 0 { + timeout = 40 * time.Second // Spot 每 20s ping,40s 足够 + } else { + timeout = 6 * time.Minute // Futures 每 3 分钟 ping + } + + if lastTime != nil && time.Since(*lastTime) > timeout { + log.Warnf("检测到假死连接 key:%s type:%v, 距离上次通信: %v, 触发重连", wm.apiKey, wm.wsType, time.Since(*lastTime)) + wm.triggerReconnect(true) + } +} + +// 主动心跳发送机制 +func (wm *BinanceWebSocketManager) startPingLoop(ctx context.Context) { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if wm.isStopped { + return + } + err := wm.ws.WriteMessage(websocket.PingMessage, []byte("ping")) + if err != nil { + log.Error("主动 Ping Binance 失败:", err) + wm.triggerReconnect(false) + } } } } // 定期删除listenkey 并重启ws -func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) { - time.Sleep(30 * time.Minute) +// func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) { +// time.Sleep(30 * time.Minute) - select { - case <-ctx.Done(): - return - default: - if err := wm.deleteListenKey(listenKey); err != nil { - log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err) - } else { - log.Debug("Successfully delete listenKey") - wm.reconnect <- struct{}{} - } - } +// select { +// case <-ctx.Done(): +// return +// default: +// if err := wm.deleteListenKey(listenKey); err != nil { +// log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err) +// } else { +// log.Debug("Successfully delete listenKey") +// wm.triggerReconnect() +// } +// } - // ticker := time.NewTicker(5 * time.Minute) - // defer ticker.Stop() - - // for { - // select { - // case <-ticker.C: - // if wm.isStopped { - // return - // } - - // if err := wm.deleteListenKey(listenKey); err != nil { - // log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err) - // } else { - // log.Debug("Successfully delete listenKey") - // wm.reconnect <- struct{}{} - // return - // } - // case <-ctx.Done(): - // return - // } - // } -} +// } // 定时续期 func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) { @@ -528,49 +711,34 @@ func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) { /* 删除listenkey */ -func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error { - client, err := wm.createBinanceClient() - if err != nil { - return err - } +// func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error { +// client, err := wm.createBinanceClient() +// if err != nil { +// return err +// } - var resp []byte +// var resp []byte - switch wm.wsType { - case 0: - path := fmt.Sprintf("/api/v3/userDataStream") - params := map[string]interface{}{ - "listenKey": listenKey, - } - resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params) +// switch wm.wsType { +// case 0: +// path := fmt.Sprintf("/api/v3/userDataStream") +// params := map[string]interface{}{ +// "listenKey": listenKey, +// } +// resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params) - log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) - case 1: - resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil) - log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) - default: - return errors.New("unknown ws type") - } +// log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) +// case 1: +// resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil) +// log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) +// default: +// return errors.New("unknown ws type") +// } - return err -} +// return err +// } func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error { - // payloadParam := map[string]interface{}{ - // "listenKey": listenKey, - // "apiKey": wm.apiKey, - // } - - // params := map[string]interface{}{ - // "id": getUUID(), - // "method": "userDataStream.ping", - // "params": payloadParam, - // } - - // if err := wm.ws.WriteJSON(params); err != nil { - // return err - // } - // wm.ws.WriteJSON() client, err := wm.createBinanceClient() if err != nil { return err