package excservice import ( "context" "errors" "fmt" "go-admin/common/global" "go-admin/common/helper" "go-admin/models/binancedto" "go-admin/models/commondto" "go-admin/pkg/jsonhelper" "go-admin/pkg/utility" "go-admin/services/proxy" "strings" "sync" "sync/atomic" "time" "github.com/bytedance/sonic" log "github.com/go-admin-team/go-admin-core/logger" "github.com/gorilla/websocket" ) type BinanceWebSocketManager struct { mu sync.Mutex stopOnce sync.Once // 新增,确保Stop只执行一次 ws *websocket.Conn stopChannel chan struct{} url string wsType int apiKey string apiSecret string proxyType string proxyAddress string reconnect chan struct{} isStopped atomic.Bool cancelFunc context.CancelFunc listenKey string reconnecting atomic.Bool ConnectTime time.Time lastPingTime atomic.Value forceReconnectTimer *time.Timer } // sendPing 使用锁来确保发送ping时的线程安全 func (wm *BinanceWebSocketManager) sendPing() error { wm.mu.Lock() defer wm.mu.Unlock() if wm.ws == nil { return errors.New("websocket connection is nil") } wm.lastPingTime.Store(time.Now()) // 使用控制帧发送 Ping,避免与普通消息写入竞争 return wm.ws.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(10*time.Second)) } // Stop 优雅地停止WebSocket管理器 func (wm *BinanceWebSocketManager) Stop() { wm.stopOnce.Do(func() { wm.mu.Lock() if wm.isStopped.Load() { wm.mu.Unlock() return } wm.isStopped.Store(true) wm.mu.Unlock() if wm.cancelFunc != nil { wm.cancelFunc() } wm.closeConn() // 统一调用带锁的关闭方法 // 尝试关闭stopChannel,如果已经关闭则忽略 select { case <-wm.stopChannel: default: close(wm.stopChannel) } log.Infof("WebSocket 已完全停止 key:%s", wm.apiKey) }) } // handleReconnect 处理重连逻辑 func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) { const maxRetries = 10 backoff := time.Second for { select { case <-wm.reconnect: // 当收到重连信号时,不再依赖 CompareAndSwap 直接跳过的逻辑。 // 之前的实现中,triggerReconnect 已将 reconnecting 标记为 true, // 导致这里 CompareAndSwap 失败而 continue,从而丢失首次重连信号。 // 修复:收到信号后直接置位为重连状态并继续执行重连流程,保证首次重连一定发生。 wm.reconnecting.Store(true) // 在开始重连循环之前,先安全地关闭旧的连接 wm.closeConn() retryCount := 0 for retryCount < maxRetries { select { case <-wm.stopChannel: wm.reconnecting.Store(false) return default: } log.Infof("WebSocket (%d) 正在尝试重连,第 %d 次...", wm.wsType, retryCount+1) if err := wm.connect(ctx); err == nil { log.Infof("WebSocket (%d) 重连成功", wm.wsType) wm.reconnecting.Store(false) setLastTime(wm) go wm.startDeadCheck(ctx) // 重连成功后,重新启动假死检测 break // 跳出重连循环 } retryCount++ time.Sleep(backoff) backoff *= 2 if backoff > time.Minute { backoff = time.Minute } } if retryCount >= maxRetries { log.Errorf("WebSocket (%s) 重连失败次数过多,停止重连", wm.wsType) wm.Stop() return } case <-ctx.Done(): wm.reconnecting.Store(false) return } } } // 已有连接 var SpotSockets = map[string]*BinanceWebSocketManager{} var FutureSockets = map[string]*BinanceWebSocketManager{} /** * 创建新的Binance WebSocket管理器 * @param wsType WebSocket类型:0-现货,1-合约 * @param apiKey API密钥 * @param apiSecret API密钥 * @param proxyType 代理类型 * @param proxyAddress 代理地址 * @return WebSocket管理器实例 */ func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager { url := "" switch wsType { case 0: url = "wss://stream.binance.com:9443/ws" case 1: url = "wss://fstream.binance.com/ws" } wm := &BinanceWebSocketManager{ stopChannel: make(chan struct{}, 10), reconnect: make(chan struct{}, 10), url: url, wsType: wsType, apiKey: apiKey, apiSecret: apiSecret, proxyType: proxyType, proxyAddress: proxyAddress, } // 初始化最后ping时间 wm.lastPingTime.Store(time.Now()) wm.isStopped.Store(false) return wm } /** * 获取API密钥 * @return API密钥 */ func (wm *BinanceWebSocketManager) GetKey() string { return wm.apiKey } /** * 启动WebSocket连接 */ func (wm *BinanceWebSocketManager) Start() { utility.SafeGo(wm.run) } /** * 重启连接,更新配置参数 * @param apiKey 新的API密钥 * @param apiSecret 新的API密钥 * @param proxyType 新的代理类型 * @param proxyAddress 新的代理地址 * @return WebSocket管理器实例 */ func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager { wm.mu.Lock() defer wm.mu.Unlock() wm.apiKey = apiKey wm.apiSecret = apiSecret wm.proxyType = proxyType wm.proxyAddress = proxyAddress if wm.isStopped.Load() { wm.isStopped.Store(false) utility.SafeGo(wm.run) } else { log.Warnf("调用restart") wm.triggerReconnect(true) } return wm } /** * 触发重连机制 * @param force 是否强制重连 */ func (wm *BinanceWebSocketManager) triggerReconnect(force bool) { if force { wm.reconnecting.Store(false) // 强制重置标志位 } if wm.reconnecting.CompareAndSwap(false, true) { log.Warnf("准备重连 key: %s wsType: %v", wm.apiKey, wm.wsType) // 发送信号触发重连协程 select { case wm.reconnect <- struct{}{}: default: // 防止阻塞,如果通道满了就跳过 log.Debugf("reconnect 信号已存在,跳过 key:%s", wm.apiKey) } } } /** * 外部重启函数 * @param wm WebSocket管理器实例 */ func Restart(wm *BinanceWebSocketManager) { log.Warnf("调用restart") wm.triggerReconnect(true) } /** * 主运行循环 */ func (wm *BinanceWebSocketManager) run() { ctx, cancel := context.WithCancel(context.Background()) wm.cancelFunc = cancel // utility.SafeGo(wm.handleSignal) // 计算错误记录键 errKey := fmt.Sprintf(global.API_WEBSOCKET_ERR, wm.apiKey) errMessage := commondto.WebSocketErr{Time: time.Now()} helper.DefaultRedis.SetString(errKey, jsonhelper.ToJsonString(errMessage)) // 在主循环前统一启动重连与假死检测,避免重复启动 utility.SafeGo(func() { wm.handleReconnect(ctx) }) utility.SafeGo(func() { wm.startDeadCheck(ctx) }) for { select { case <-ctx.Done(): return default: if err := wm.connect(ctx); err != nil { wm.handleConnectionError(errKey, err) if wm.isErrorCountExceeded(errKey) { log.Errorf("连接 %s WebSocket 时出错次数过多,停止 WebSocket 管理器: %v", wm.wsType, wm.apiKey) wm.Stop() return } time.Sleep(5 * time.Second) continue } <-wm.stopChannel log.Infof("停止 %s WebSocket 管理器...", getWsTypeName(wm.wsType)) wm.Stop() return } } } /** * 处理WebSocket连接错误 * @param errKey Redis错误记录键 * @param err 错误信息 */ func (wm *BinanceWebSocketManager) handleConnectionError(errKey string, err error) { // 从 Redis 获取错误记录 var errMessage commondto.WebSocketErr val, _ := helper.DefaultRedis.GetString(errKey) if val != "" { sonic.UnmarshalString(val, &errMessage) } // 更新错误记录 errMessage.Count++ errMessage.Time = time.Now() errMessage.ErrorMessage = err.Error() // 将错误记录保存到 Redis if data, err := sonic.MarshalString(errMessage); err == nil { helper.DefaultRedis.SetString(errKey, data) } // 记录错误日志 log.Errorf("连接 %s WebSocket 时出错: %v, 错误: %v", wm.wsType, wm.apiKey, err) } /** * 检查错误次数是否超过阈值 * @param errKey Redis错误记录键 * @return 是否超过阈值 */ func (wm *BinanceWebSocketManager) isErrorCountExceeded(errKey string) bool { val, _ := helper.DefaultRedis.GetString(errKey) if val == "" { return false } var errMessage commondto.WebSocketErr if err := sonic.UnmarshalString(val, &errMessage); err != nil { return false } return errMessage.Count >= 5 } /** * 建立WebSocket连接 * @param ctx 上下文 * @return 错误信息 */ func (wm *BinanceWebSocketManager) connect(ctx context.Context) error { dialer, err := proxy.GetDialer(wm.proxyType, wm.proxyAddress) if err != nil { return err } listenKey, err := wm.getListenKey() if err != nil { return err } wm.listenKey = listenKey url := fmt.Sprintf("%s/%s", wm.url, listenKey) wm.ws, _, err = dialer.Dial(url, nil) if err != nil { return err } // 连接成功,更新连接时间和ping时间 wm.ConnectTime = time.Now() wm.lastPingTime.Store(time.Now()) setLastTime(wm) log.Infof("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey) // 设置读超时 var readTimeout time.Duration if wm.wsType == 0 { readTimeout = 2 * time.Minute } else { readTimeout = 4 * time.Minute } _ = wm.ws.SetReadDeadline(time.Now().Add(readTimeout)) // Ping处理 wm.ws.SetPingHandler(func(appData string) error { log.Infof("收到 wstype: %d key:%s Ping 消息【%s】", wm.wsType, wm.apiKey, appData) for x := 0; x < 5; x++ { if err := wm.ws.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second*10)); err != nil { log.Errorf("binance 回应pong失败 次数:%d err:%v", x, err) time.Sleep(time.Second * 1) continue } break } // 更新ping时间和Redis记录,并刷新读超时 wm.lastPingTime.Store(time.Now()) setLastTime(wm) _ = wm.ws.SetReadDeadline(time.Now().Add(readTimeout)) return nil }) // Pong处理:收到服务端Pong时刷新心跳与读超时 wm.ws.SetPongHandler(func(appData string) error { wm.lastPingTime.Store(time.Now()) setLastTime(wm) _ = wm.ws.SetReadDeadline(time.Now().Add(readTimeout)) return nil }) // 启动必要协程(避免在此处重复启动重连与假死检测) utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) }) utility.SafeGo(func() { wm.readMessages(ctx) }) utility.SafeGo(func() { wm.startPingLoop(ctx) }) utility.SafeGo(func() { wm.start24HourReconnectTimer(ctx) }) // 启动23小时强制重连 return nil } // ReplaceConnection 创建新连接并关闭旧连接,实现无缝连接替换 func (wm *BinanceWebSocketManager) ReplaceConnection() error { wm.mu.Lock() if wm.isStopped.Load() { wm.mu.Unlock() return errors.New("WebSocket 已停止") } oldCtxCancel := wm.cancelFunc oldConn := wm.ws wm.mu.Unlock() log.Infof("🔄 正在替换连接: %s", wm.apiKey) // 步骤 1:先获取新的 listenKey 和连接 newListenKey, err := wm.getListenKey() if err != nil { return fmt.Errorf("获取新 listenKey 失败: %w", err) } dialer, err := proxy.GetDialer(wm.proxyType, wm.proxyAddress) if err != nil { return err } newURL := fmt.Sprintf("%s/%s", wm.url, newListenKey) newConn, _, err := dialer.Dial(newURL, nil) if err != nil { return fmt.Errorf("新连接 Dial 失败: %w", err) } // 步骤 2:创建新的上下文并启动协程 newCtx, newCancel := context.WithCancel(context.Background()) // 设置 ping handler newConn.SetPingHandler(func(appData string) error { // log.Infof("收到 Ping(新连接) key:%s msg:%s", wm.apiKey, appData) for x := 0; x < 5; x++ { if err := newConn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(10*time.Second)); err != nil { log.Errorf("Pong 失败 %d 次 err:%v", x, err) time.Sleep(time.Second) continue } break } setLastTime(wm) return nil }) // 步骤 3:安全切换连接 wm.mu.Lock() wm.ws = newConn wm.listenKey = newListenKey wm.ConnectTime = time.Now() wm.cancelFunc = newCancel wm.mu.Unlock() log.Infof("✅ 替换连接成功: %s listenKey: %s", wm.apiKey, newListenKey) // 步骤 4:启动新连接协程 go wm.startListenKeyRenewal2(newCtx) go wm.readMessages(newCtx) go wm.startPingLoop(newCtx) // go wm.startDeadCheck(newCtx) // 步骤 5:关闭旧连接、取消旧协程 if oldCtxCancel != nil { oldCtxCancel() } if oldConn != nil { _ = oldConn.Close() log.Infof("🔒 旧连接已关闭: %s", wm.apiKey) } return nil } // 更新最后通信时间 func setLastTime(wm *BinanceWebSocketManager) { subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey) val, _ := helper.DefaultRedis.GetString(subKey) now := time.Now() var data binancedto.UserSubscribeState if val != "" { sonic.Unmarshal([]byte(val), &data) } if wm.wsType == 0 { data.SpotLastTime = &now } else { data.FuturesLastTime = &now } val, _ = sonic.MarshalString(&data) if val != "" { helper.DefaultRedis.SetString(subKey, val) } } // 复用创建HTTP客户端的逻辑 func (wm *BinanceWebSocketManager) createBinanceClient() (*helper.BinanceClient, error) { return helper.NewBinanceClient(wm.apiKey, wm.apiSecret, wm.proxyType, wm.proxyAddress) } // 获取listenKey func (wm *BinanceWebSocketManager) getListenKey() (string, error) { client, err := wm.createBinanceClient() if err != nil { return "", err } var resp []byte switch wm.wsType { case 0: resp, _, err = client.SendSpotRequestByKey("/api/v3/userDataStream", "POST", nil) case 1: resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "POST", nil) default: log.Error("链接类型错误") return "", errors.New("链接类型错误") } if err != nil { return "", err } var dataMap map[string]interface{} if err := sonic.Unmarshal(resp, &dataMap); err != nil { return "", err } if v, ok := dataMap["listenKey"]; ok { s, ok2 := v.(string) if !ok2 || s == "" { return "", errors.New("listenKey 类型错误或为空") } return s, nil } return "", errors.New("listenKey 不存在") } // 接收消息 func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) { for { select { case <-ctx.Done(): return default: if wm.isStopped.Load() { return } _, msg, err := wm.ws.ReadMessage() if err != nil { // 检查是否是由于我们主动关闭连接导致的错误 if wm.isStopped.Load() { log.Infof("WebSocket read loop gracefully stopped for key: %s", wm.apiKey) return } // 如果不是主动关闭,再判断是否是连接关闭错误,并触发重连 if strings.Contains(err.Error(), "websocket: close") { log.Errorf("WebSocket connection closed unexpectedly, triggering reconnect for key: %s. Error: %v", wm.apiKey, err) wm.triggerReconnect(false) } else { log.Errorf("Error reading message for key: %s. Error: %v", wm.apiKey, err) } return // 任何错误发生后都应退出读取循环 } wm.handleOrderUpdate(msg) } } } func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) { setLastTime(wm) if reconnect, fatal, err := ReceiveListen(msg, wm.wsType, wm.apiKey); err != nil { log.Errorf("处理消息时出错: %v", err) // 根据错误类型决定是否停止 if fatal { log.Errorf("收到致命错误,将停止 WebSocket 管理器 for key: %s", wm.apiKey) wm.Stop() } return } else if fatal { log.Errorf("收到致命错误,将停止 WebSocket 管理器 for key: %s", wm.apiKey) wm.Stop() return } else if reconnect { log.Warnf("收到重连请求,将触发重连 for key: %s", wm.apiKey) wm.triggerReconnect(false) } } // closeConn 安全地关闭WebSocket连接 func (wm *BinanceWebSocketManager) closeConn() { wm.mu.Lock() defer wm.mu.Unlock() if wm.ws != nil { _ = wm.ws.Close() wm.ws = nil } } // 主动心跳发送机制 func (wm *BinanceWebSocketManager) startPingLoop(ctx context.Context) { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if wm.isStopped.Load() { return } if err := wm.sendPing(); err != nil { log.Errorf("主动 Ping Binance 失败: %v", err) wm.triggerReconnect(false) return // 退出循环,等待重连 } } } } // 定时续期 func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) { ticker := time.NewTicker(30 * time.Minute) defer func() { log.Debug("定时续期任务退出 key:", wm.apiKey) ticker.Stop() }() for { select { case <-ticker.C: if wm.isStopped.Load() { return } if err := wm.renewListenKey(wm.listenKey); err != nil { log.Errorf("Failed to renew listenKey: ,type:%v key: %s err:%v", wm.wsType, wm.apiKey, err) } case <-ctx.Done(): return } } } func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error { client, err := wm.createBinanceClient() if err != nil { return err } var resp []byte switch wm.wsType { case 0: path := fmt.Sprintf("/api/v3/userDataStream?listenKey=%s", listenKey) resp, _, err = client.SendSpotRequestByKey(path, "PUT", nil) log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp))) case 1: // path := fmt.Sprintf("/fapi/v1/listenKey", listenKey) resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "PUT", nil) log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp))) default: return errors.New("unknown ws type") } return nil } func getWsTypeName(wsType int) string { switch wsType { case 0: return "spot" case 1: return "futures" default: return "unknown" } } /** * 启动24小时强制重连定时器 * @param ctx 上下文 */ func (wm *BinanceWebSocketManager) start24HourReconnectTimer(ctx context.Context) { // Binance要求不到24小时就需要主动断开重连 // 设置为23小时,留出一些缓冲时间 // duration := 23 * time.Hour duration := 10 * time.Minute wm.forceReconnectTimer = time.NewTimer(duration) defer func() { if wm.forceReconnectTimer != nil { wm.forceReconnectTimer.Stop() } }() select { case <-ctx.Done(): return case <-wm.forceReconnectTimer.C: if !wm.isStopped.Load() { log.Warnf("23小时强制重连触发 key:%s wsType:%v", wm.apiKey, wm.wsType) wm.triggerReconnect(true) } } } /** * 改进的假死检测机制 * @param ctx 上下文 */ func (wm *BinanceWebSocketManager) startDeadCheck(ctx context.Context) { // 缩短检测间隔,提高响应速度 ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if wm.isStopped.Load() { return } wm.DeadCheck() } } } /** * 改进的假死检测逻辑 */ func (wm *BinanceWebSocketManager) DeadCheck() { // 检查最后ping时间 lastPing := wm.lastPingTime.Load().(time.Time) // 定义最大静默时间(根据WebSocket类型调整) var timeout time.Duration if wm.wsType == 0 { timeout = 3 * time.Minute // 现货:3分钟无ping视为假死 } else { timeout = 6 * time.Minute // 合约:6分钟无ping视为假死 } if time.Since(lastPing) > timeout { log.Warnf("检测到假死连接(基于ping时间) key:%s type:%v, 距离上次ping: %v, 触发重连", wm.apiKey, wm.wsType, time.Since(lastPing)) wm.triggerReconnect(true) return } // 同时检查Redis中的记录作为备用检测 subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey) val, _ := helper.DefaultRedis.GetString(subKey) if val == "" { log.Debugf("没有订阅信息,跳过Redis假死检测") return } var data binancedto.UserSubscribeState _ = sonic.Unmarshal([]byte(val), &data) var lastTime *time.Time if wm.wsType == 0 { lastTime = data.SpotLastTime } else { lastTime = data.FuturesLastTime } // Redis记录的超时时间设置得更长一些 var redisTimeout time.Duration if wm.wsType == 0 { redisTimeout = 3 * time.Minute } else { redisTimeout = 6 * time.Minute } if lastTime != nil && time.Since(*lastTime) > redisTimeout { log.Warnf("检测到假死连接(基于Redis记录) key:%s type:%v, 距离上次通信: %v, 触发重连", wm.apiKey, wm.wsType, time.Since(*lastTime)) wm.triggerReconnect(true) } }