package excservice import ( "context" "crypto/tls" "errors" "fmt" "go-admin/common/global" "go-admin/common/helper" "go-admin/models/binancedto" "go-admin/models/commondto" "go-admin/pkg/jsonhelper" "go-admin/pkg/utility" "math/rand" "net" "net/http" "net/url" "os" "os/signal" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/bytedance/sonic" log "github.com/go-admin-team/go-admin-core/logger" "github.com/gorilla/websocket" "golang.org/x/net/proxy" ) type BinanceWebSocketManager struct { ws *websocket.Conn stopChannel chan struct{} url string /* 0-现货 1-合约 */ wsType int apiKey string apiSecret string proxyType string proxyAddress string reconnect chan struct{} isStopped bool // 标记 WebSocket 是否已主动停止 mu sync.Mutex // 用于控制并发访问 isStopped cancelFunc context.CancelFunc listenKey string // 新增字段 reconnecting atomic.Bool // 防止重复重连 ConnectTime time.Time // 当前连接建立时间 } // 已有连接 var SpotSockets = map[string]*BinanceWebSocketManager{} var FutureSockets = map[string]*BinanceWebSocketManager{} func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager { url := "" switch wsType { case 0: url = "wss://stream.binance.com:9443/ws" case 1: url = "wss://fstream.binance.com/ws" } return &BinanceWebSocketManager{ stopChannel: make(chan struct{}, 10), reconnect: make(chan struct{}, 10), isStopped: false, url: url, wsType: wsType, apiKey: apiKey, apiSecret: apiSecret, proxyType: proxyType, proxyAddress: proxyAddress, } } func (wm *BinanceWebSocketManager) GetKey() string { return wm.apiKey } func (wm *BinanceWebSocketManager) Start() { utility.SafeGo(wm.run) // wm.run() } // 重启连接 func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager { wm.mu.Lock() defer wm.mu.Unlock() wm.apiKey = apiKey wm.apiSecret = apiSecret wm.proxyType = proxyType wm.proxyAddress = proxyAddress if wm.isStopped { wm.isStopped = false utility.SafeGo(wm.run) } else { log.Warnf("调用restart") wm.triggerReconnect(true) } return wm } // 触发重连 func (wm *BinanceWebSocketManager) triggerReconnect(force bool) { if force { wm.reconnecting.Store(false) // 强制重置标志位 } if wm.reconnecting.CompareAndSwap(false, true) { log.Warnf("准备重连 key: %s wsType: %v", wm.apiKey, wm.wsType) // 发送信号触发重连协程 select { case wm.reconnect <- struct{}{}: default: // 防止阻塞,如果通道满了就跳过 } } } func Restart(wm *BinanceWebSocketManager) { log.Warnf("调用restart") wm.triggerReconnect(true) } func (wm *BinanceWebSocketManager) run() { ctx, cancel := context.WithCancel(context.Background()) wm.cancelFunc = cancel utility.SafeGo(wm.handleSignal) // 计算错误记录键 errKey := fmt.Sprintf(global.API_WEBSOCKET_ERR, wm.apiKey) errMessage := commondto.WebSocketErr{Time: time.Now()} helper.DefaultRedis.SetString(errKey, jsonhelper.ToJsonString(errMessage)) for { select { case <-ctx.Done(): return default: if err := wm.connect(ctx); err != nil { wm.handleConnectionError(errKey, err) if wm.isErrorCountExceeded(errKey) { log.Error("连接 %s WebSocket 时出错次数过多,停止 WebSocket 管理器: %v", wm.wsType, wm.apiKey) wm.Stop() return } time.Sleep(5 * time.Second) continue } <-wm.stopChannel log.Info("停止 %s WebSocket 管理器...", getWsTypeName(wm.wsType)) wm.Stop() return } } } // handleConnectionError 处理 WebSocket 连接错误 func (wm *BinanceWebSocketManager) handleConnectionError(errKey string, err error) { // 从 Redis 获取错误记录 var errMessage commondto.WebSocketErr val, _ := helper.DefaultRedis.GetString(errKey) if val != "" { sonic.UnmarshalString(val, &errMessage) } // 更新错误记录 errMessage.Count++ errMessage.Time = time.Now() errMessage.ErrorMessage = err.Error() // 将错误记录保存到 Redis if data, err := sonic.MarshalString(errMessage); err == nil { helper.DefaultRedis.SetString(errKey, data) } // 记录错误日志 log.Error("连接 %s WebSocket 时出错: %v, 错误: %v", wm.wsType, wm.apiKey, err) } // isErrorCountExceeded 检查错误次数是否超过阈值 func (wm *BinanceWebSocketManager) isErrorCountExceeded(errKey string) bool { val, _ := helper.DefaultRedis.GetString(errKey) if val == "" { return false } var errMessage commondto.WebSocketErr if err := sonic.UnmarshalString(val, &errMessage); err != nil { return false } return errMessage.Count >= 5 } // 处理终止信号 func (wm *BinanceWebSocketManager) handleSignal() { ch := make(chan os.Signal) signal.Notify(ch, os.Interrupt) <-ch wm.Stop() } func (wm *BinanceWebSocketManager) connect(ctx context.Context) error { dialer, err := wm.getDialer() if err != nil { return err } listenKey, err := wm.getListenKey() if err != nil { return err } wm.listenKey = listenKey url := fmt.Sprintf("%s/%s", wm.url, listenKey) wm.ws, _, err = dialer.Dial(url, nil) if err != nil { return err } // 连接成功,更新连接时间 wm.ConnectTime = time.Now() log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey)) // Ping处理 wm.ws.SetPingHandler(func(appData string) error { log.Info(fmt.Sprintf("收到 wstype: %v key:%s Ping 消息【%s】", wm.wsType, wm.apiKey, appData)) for x := 0; x < 5; x++ { if err := wm.ws.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second*10)); err != nil { log.Error("binance 回应pong失败 次数:", strconv.Itoa(x), " err:", err) time.Sleep(time.Second * 1) continue } break } setLastTime(wm) return nil }) utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) }) utility.SafeGo(func() { wm.readMessages(ctx) }) utility.SafeGo(func() { wm.handleReconnect(ctx) }) utility.SafeGo(func() { wm.startPingLoop(ctx) }) // utility.SafeGo(func() { wm.startDeadCheck(ctx) }) return nil } // ReplaceConnection 创建新连接并关闭旧连接,实现无缝连接替换 func (wm *BinanceWebSocketManager) ReplaceConnection() error { wm.mu.Lock() if wm.isStopped { wm.mu.Unlock() return errors.New("WebSocket 已停止") } oldCtxCancel := wm.cancelFunc oldConn := wm.ws wm.mu.Unlock() log.Infof("🔄 正在替换连接: %s", wm.apiKey) // 步骤 1:先获取新的 listenKey 和连接 newListenKey, err := wm.getListenKey() if err != nil { return fmt.Errorf("获取新 listenKey 失败: %w", err) } dialer, err := wm.getDialer() if err != nil { return err } newURL := fmt.Sprintf("%s/%s", wm.url, newListenKey) newConn, _, err := dialer.Dial(newURL, nil) if err != nil { return fmt.Errorf("新连接 Dial 失败: %w", err) } // 步骤 2:创建新的上下文并启动协程 newCtx, newCancel := context.WithCancel(context.Background()) // 设置 ping handler newConn.SetPingHandler(func(appData string) error { log.Infof("收到 Ping(新连接) key:%s msg:%s", wm.apiKey, appData) for x := 0; x < 5; x++ { if err := newConn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(10*time.Second)); err != nil { log.Errorf("Pong 失败 %d 次 err:%v", x, err) time.Sleep(time.Second) continue } break } setLastTime(wm) return nil }) // 步骤 3:安全切换连接 wm.mu.Lock() wm.ws = newConn wm.listenKey = newListenKey wm.ConnectTime = time.Now() wm.cancelFunc = newCancel wm.mu.Unlock() log.Infof("✅ 替换连接成功: %s listenKey: %s", wm.apiKey, newListenKey) // 步骤 4:启动新连接协程 go wm.startListenKeyRenewal2(newCtx) go wm.readMessages(newCtx) go wm.handleReconnect(newCtx) go wm.startPingLoop(newCtx) // go wm.startDeadCheck(newCtx) // 步骤 5:关闭旧连接、取消旧协程 if oldCtxCancel != nil { oldCtxCancel() } if oldConn != nil { _ = oldConn.Close() log.Infof("🔒 旧连接已关闭: %s", wm.apiKey) } return nil } // 更新最后通信时间 func setLastTime(wm *BinanceWebSocketManager) { subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey) val, _ := helper.DefaultRedis.GetString(subKey) now := time.Now() var data binancedto.UserSubscribeState if val != "" { sonic.Unmarshal([]byte(val), &data) } if wm.wsType == 0 { data.SpotLastTime = &now } else { data.FuturesLastTime = &now } val, _ = sonic.MarshalString(&data) if val != "" { helper.DefaultRedis.SetString(subKey, val) } } func (wm *BinanceWebSocketManager) getDialer() (*websocket.Dialer, error) { if wm.proxyAddress == "" { return &websocket.Dialer{}, nil } if !strings.HasPrefix(wm.proxyAddress, "http://") && !strings.HasPrefix(wm.proxyAddress, "https://") && !strings.HasPrefix(wm.proxyAddress, "socks5://") { wm.proxyAddress = wm.proxyType + "://" + wm.proxyAddress } proxyURL, err := url.Parse(wm.proxyAddress) if err != nil { return nil, fmt.Errorf("failed to parse proxy URL: %v", err) } transport := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: false}, } switch proxyURL.Scheme { case "socks5": return wm.createSocks5Dialer(proxyURL) case "http", "https": transport.Proxy = http.ProxyURL(proxyURL) return &websocket.Dialer{Proxy: transport.Proxy, TLSClientConfig: transport.TLSClientConfig}, nil default: return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme) } } func (wm *BinanceWebSocketManager) createSocks5Dialer(proxyURL *url.URL) (*websocket.Dialer, error) { auth := &proxy.Auth{} if proxyURL.User != nil { auth.User = proxyURL.User.Username() auth.Password, _ = proxyURL.User.Password() } socksDialer, err := proxy.SOCKS5("tcp", proxyURL.Host, auth, proxy.Direct) if err != nil { return nil, fmt.Errorf("failed to create SOCKS5 proxy dialer: %v", err) } return &websocket.Dialer{ NetDialContext: func(ctx context.Context, network, address string) (net.Conn, error) { return socksDialer.Dial(network, address) }, }, nil } // 复用创建HTTP客户端的逻辑 func (wm *BinanceWebSocketManager) createBinanceClient() (*helper.BinanceClient, error) { return helper.NewBinanceClient(wm.apiKey, wm.apiSecret, wm.proxyType, wm.proxyAddress) } // 获取listenKey func (wm *BinanceWebSocketManager) getListenKey() (string, error) { client, err := wm.createBinanceClient() if err != nil { return "", err } var resp []byte switch wm.wsType { case 0: resp, _, err = client.SendSpotRequestByKey("/api/v3/userDataStream", "POST", nil) case 1: resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "POST", nil) default: log.Error("链接类型错误") return "", errors.New("链接类型错误") } if err != nil { return "", err } var dataMap map[string]interface{} if err := sonic.Unmarshal(resp, &dataMap); err != nil { return "", err } if listenKey, ok := dataMap["listenKey"]; ok { return listenKey.(string), nil } return "", errors.New("listenKey 不存在") } // 接收消息 func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) { defer wm.ws.Close() for { select { case <-ctx.Done(): return default: if wm.isStopped { return } _, msg, err := wm.ws.ReadMessage() if err != nil && strings.Contains(err.Error(), "websocket: close") { if !wm.isStopped { log.Error("收到关闭消息", err.Error()) wm.triggerReconnect(false) } log.Error("websocket 关闭") return } else if err != nil { log.Error("读取消息时出错: %v", err) return } wm.handleOrderUpdate(msg) } } } func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) { setLastTime(wm) if reconnect, _ := ReceiveListen(msg, wm.wsType, wm.apiKey); reconnect { log.Errorf("收到重连请求") wm.triggerReconnect(false) } } func (wm *BinanceWebSocketManager) Stop() { wm.mu.Lock() defer wm.mu.Unlock() if wm.isStopped { return } wm.isStopped = true // 关闭 stopChannel(确保已经关闭,避免 panic) select { case <-wm.stopChannel: default: close(wm.stopChannel) } if wm.cancelFunc != nil { wm.cancelFunc() } if wm.ws != nil { if err := wm.ws.Close(); err != nil { log.Error(fmt.Sprintf("key【%s】close失败", wm.apiKey), err) } else { log.Info(fmt.Sprintf("key【%s】close", wm.apiKey)) } } // **重新创建 stopChannel,避免 Restart() 时无效** wm.stopChannel = make(chan struct{}) } // 重连机制 func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) { maxRetries := 5 // 最大重试次数 retryCount := 0 for { select { case <-ctx.Done(): return case <-wm.reconnect: if wm.isStopped { return } log.Warn("WebSocket 连接断开,尝试重连...") if wm.ws != nil { wm.ws.Close() } // 取消旧的上下文 if wm.cancelFunc != nil { wm.cancelFunc() } for { newCtx, cancel := context.WithCancel(context.Background()) wm.cancelFunc = cancel // 更新 cancelFunc if err := wm.connect(newCtx); err != nil { log.Errorf("重连失败: %v", err) cancel() retryCount++ if retryCount >= maxRetries { wm.reconnecting.Store(false) log.Error("重连失败次数过多,退出重连逻辑") return } time.Sleep(5 * time.Second) continue } // 重连成功,清除标记 wm.reconnecting.Store(false) retryCount = 0 return } } } } // 假死检测 func (wm *BinanceWebSocketManager) DeadCheck() { subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey) val, _ := helper.DefaultRedis.GetString(subKey) if val == "" { log.Warnf("没有订阅信息,无法进行假死检测") return } var data binancedto.UserSubscribeState _ = sonic.Unmarshal([]byte(val), &data) var lastTime *time.Time if wm.wsType == 0 { lastTime = data.SpotLastTime } else { lastTime = data.FuturesLastTime } // 定义最大静默时间(超出视为假死) var timeout time.Duration if wm.wsType == 0 { timeout = 40 * time.Second // Spot 每 20s ping,40s 足够 } else { timeout = 6 * time.Minute // Futures 每 3 分钟 ping } if lastTime != nil && time.Since(*lastTime) > timeout { log.Warnf("检测到假死连接 key:%s type:%v, 距离上次通信: %v, 触发重连", wm.apiKey, wm.wsType, time.Since(*lastTime)) wm.triggerReconnect(true) } } // 主动心跳发送机制 func (wm *BinanceWebSocketManager) startPingLoop(ctx context.Context) { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if wm.isStopped { return } err := wm.ws.WriteMessage(websocket.PingMessage, []byte("ping")) if err != nil { log.Error("主动 Ping Binance 失败:", err) wm.triggerReconnect(false) } } } } // 定期删除listenkey 并重启ws // func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) { // time.Sleep(30 * time.Minute) // select { // case <-ctx.Done(): // return // default: // if err := wm.deleteListenKey(listenKey); err != nil { // log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err) // } else { // log.Debug("Successfully delete listenKey") // wm.triggerReconnect() // } // } // } // 定时续期 func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) { ticker := time.NewTicker(30 * time.Minute) defer func() { log.Debug("定时续期任务退出 key:", wm.apiKey) ticker.Stop() }() for { select { case <-ticker.C: if wm.isStopped { return } if err := wm.renewListenKey(wm.listenKey); err != nil { log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err) } case <-ctx.Done(): return } } } /* 删除listenkey */ // func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error { // client, err := wm.createBinanceClient() // if err != nil { // return err // } // var resp []byte // switch wm.wsType { // case 0: // path := fmt.Sprintf("/api/v3/userDataStream") // params := map[string]interface{}{ // "listenKey": listenKey, // } // resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params) // log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) // case 1: // resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil) // log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) // default: // return errors.New("unknown ws type") // } // return err // } func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error { client, err := wm.createBinanceClient() if err != nil { return err } var resp []byte switch wm.wsType { case 0: path := fmt.Sprintf("/api/v3/userDataStream?listenKey=%s", listenKey) resp, _, err = client.SendSpotRequestByKey(path, "PUT", nil) log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp))) case 1: // path := fmt.Sprintf("/fapi/v1/listenKey", listenKey) resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "PUT", nil) log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp))) default: return errors.New("unknown ws type") } return nil } func getWsTypeName(wsType int) string { switch wsType { case 0: return "spot" case 1: return "futures" default: return "unknown" } } func getUUID() string { return fmt.Sprintf("%s-%s-%s-%s-%s", randomHex(8), randomHex(4), randomHex(4), randomHex(4), randomHex(12)) } func randomHex(n int) string { rand.New(rand.NewSource(time.Now().UnixNano())) hexChars := "0123456789abcdef" bytes := make([]byte, n) for i := 0; i < n; i++ { bytes[i] = hexChars[rand.Intn(len(hexChars))] } return string(bytes) }