Files
exchange_go/services/excservice/binancesocketmanager.go
2025-10-14 19:58:59 +08:00

853 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package excservice
import (
"context"
"errors"
"fmt"
"go-admin/common/global"
"go-admin/common/helper"
"go-admin/models/binancedto"
"go-admin/models/commondto"
"go-admin/pkg/jsonhelper"
"go-admin/pkg/utility"
"go-admin/services/proxy"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/bytedance/sonic"
log "github.com/go-admin-team/go-admin-core/logger"
"github.com/gorilla/websocket"
)
type BinanceWebSocketManager struct {
mu sync.Mutex
stopOnce sync.Once // 新增确保Stop只执行一次
ws *websocket.Conn
stopChannel chan struct{}
url string
wsType int
apiKey string
apiSecret string
proxyType string
proxyAddress string
reconnect chan struct{}
isStopped bool
cancelFunc context.CancelFunc
listenKey string
reconnecting atomic.Bool
ConnectTime time.Time
lastPingTime atomic.Value
forceReconnectTimer *time.Timer
}
// ... (省略中间代码)
// sendPing 使用锁来确保发送ping时的线程安全
func (wm *BinanceWebSocketManager) sendPing() error {
wm.mu.Lock()
defer wm.mu.Unlock()
if wm.ws == nil {
return errors.New("websocket connection is nil")
}
wm.lastPingTime.Store(time.Now())
return wm.ws.WriteMessage(websocket.PingMessage, []byte("ping"))
}
// Stop 优雅地停止WebSocket管理器
func (wm *BinanceWebSocketManager) Stop() {
wm.stopOnce.Do(func() {
wm.mu.Lock()
if wm.isStopped {
wm.mu.Unlock()
return
}
wm.isStopped = true
wm.mu.Unlock()
if wm.cancelFunc != nil {
wm.cancelFunc()
}
wm.closeConn() // 统一调用带锁的关闭方法
// 尝试关闭stopChannel如果已经关闭则忽略
select {
case <-wm.stopChannel:
default:
close(wm.stopChannel)
}
log.Infof("WebSocket 已完全停止 key:%s", wm.apiKey)
})
}
// handleReconnect 处理重连逻辑
func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) {
const maxRetries = 10
backoff := time.Second
for {
select {
case <-wm.reconnect:
if !wm.reconnecting.CompareAndSwap(false, true) {
continue // 如果已经在重连,则忽略
}
// 在开始重连循环之前,先安全地关闭旧的连接
wm.closeConn()
retryCount := 0
for retryCount < maxRetries {
select {
case <-wm.stopChannel:
wm.reconnecting.Store(false)
return
default:
}
log.Infof("WebSocket (%s) 正在尝试重连,第 %d 次...", wm.wsType, retryCount+1)
if err := wm.connect(ctx); err == nil {
log.Infof("WebSocket (%s) 重连成功", wm.wsType)
wm.reconnecting.Store(false)
go wm.startDeadCheck(ctx) // 重连成功后,重新启动假死检测
break // 跳出重连循环
}
retryCount++
time.Sleep(backoff)
backoff *= 2
if backoff > time.Minute {
backoff = time.Minute
}
}
if retryCount >= maxRetries {
log.Errorf("WebSocket (%s) 重连失败次数过多,停止重连", wm.wsType)
wm.Stop()
return
}
case <-ctx.Done():
wm.reconnecting.Store(false)
return
}
}
}
// 已有连接
var SpotSockets = map[string]*BinanceWebSocketManager{}
var FutureSockets = map[string]*BinanceWebSocketManager{}
/**
* 创建新的Binance WebSocket管理器
* @param wsType WebSocket类型0-现货1-合约
* @param apiKey API密钥
* @param apiSecret API密钥
* @param proxyType 代理类型
* @param proxyAddress 代理地址
* @return WebSocket管理器实例
*/
func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
url := ""
switch wsType {
case 0:
url = "wss://stream.binance.com:9443/ws"
case 1:
url = "wss://fstream.binance.com/ws"
}
wm := &BinanceWebSocketManager{
stopChannel: make(chan struct{}, 10),
reconnect: make(chan struct{}, 10),
isStopped: false,
url: url,
wsType: wsType,
apiKey: apiKey,
apiSecret: apiSecret,
proxyType: proxyType,
proxyAddress: proxyAddress,
}
// 初始化最后ping时间
wm.lastPingTime.Store(time.Now())
return wm
}
/**
* 获取API密钥
* @return API密钥
*/
func (wm *BinanceWebSocketManager) GetKey() string {
return wm.apiKey
}
/**
* 启动WebSocket连接
*/
func (wm *BinanceWebSocketManager) Start() {
utility.SafeGo(wm.run)
}
/**
* 重启连接,更新配置参数
* @param apiKey 新的API密钥
* @param apiSecret 新的API密钥
* @param proxyType 新的代理类型
* @param proxyAddress 新的代理地址
* @return WebSocket管理器实例
*/
func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
wm.mu.Lock()
defer wm.mu.Unlock()
wm.apiKey = apiKey
wm.apiSecret = apiSecret
wm.proxyType = proxyType
wm.proxyAddress = proxyAddress
if wm.isStopped {
wm.isStopped = false
utility.SafeGo(wm.run)
} else {
log.Warnf("调用restart")
wm.triggerReconnect(true)
}
return wm
}
/**
* 触发重连机制
* @param force 是否强制重连
*/
func (wm *BinanceWebSocketManager) triggerReconnect(force bool) {
if force {
wm.reconnecting.Store(false) // 强制重置标志位
}
if wm.reconnecting.CompareAndSwap(false, true) {
log.Warnf("准备重连 key: %s wsType: %v", wm.apiKey, wm.wsType)
// 发送信号触发重连协程
select {
case wm.reconnect <- struct{}{}:
default:
// 防止阻塞,如果通道满了就跳过
log.Debugf("reconnect 信号已存在,跳过 key:%s", wm.apiKey)
}
}
}
/**
* 外部重启函数
* @param wm WebSocket管理器实例
*/
func Restart(wm *BinanceWebSocketManager) {
log.Warnf("调用restart")
wm.triggerReconnect(true)
}
/**
* 主运行循环
*/
func (wm *BinanceWebSocketManager) run() {
ctx, cancel := context.WithCancel(context.Background())
wm.cancelFunc = cancel
// utility.SafeGo(wm.handleSignal)
// 计算错误记录键
errKey := fmt.Sprintf(global.API_WEBSOCKET_ERR, wm.apiKey)
errMessage := commondto.WebSocketErr{Time: time.Now()}
helper.DefaultRedis.SetString(errKey, jsonhelper.ToJsonString(errMessage))
for {
select {
case <-ctx.Done():
return
default:
if err := wm.connect(ctx); err != nil {
wm.handleConnectionError(errKey, err)
if wm.isErrorCountExceeded(errKey) {
log.Error("连接 %s WebSocket 时出错次数过多,停止 WebSocket 管理器: %v", wm.wsType, wm.apiKey)
wm.Stop()
return
}
time.Sleep(5 * time.Second)
continue
}
<-wm.stopChannel
log.Info("停止 %s WebSocket 管理器...", getWsTypeName(wm.wsType))
wm.Stop()
return
}
}
}
/**
* 处理WebSocket连接错误
* @param errKey Redis错误记录键
* @param err 错误信息
*/
func (wm *BinanceWebSocketManager) handleConnectionError(errKey string, err error) {
// 从 Redis 获取错误记录
var errMessage commondto.WebSocketErr
val, _ := helper.DefaultRedis.GetString(errKey)
if val != "" {
sonic.UnmarshalString(val, &errMessage)
}
// 更新错误记录
errMessage.Count++
errMessage.Time = time.Now()
errMessage.ErrorMessage = err.Error()
// 将错误记录保存到 Redis
if data, err := sonic.MarshalString(errMessage); err == nil {
helper.DefaultRedis.SetString(errKey, data)
}
// 记录错误日志
log.Error("连接 %s WebSocket 时出错: %v, 错误: %v", wm.wsType, wm.apiKey, err)
}
/**
* 检查错误次数是否超过阈值
* @param errKey Redis错误记录键
* @return 是否超过阈值
*/
func (wm *BinanceWebSocketManager) isErrorCountExceeded(errKey string) bool {
val, _ := helper.DefaultRedis.GetString(errKey)
if val == "" {
return false
}
var errMessage commondto.WebSocketErr
if err := sonic.UnmarshalString(val, &errMessage); err != nil {
return false
}
return errMessage.Count >= 5
}
/**
* 建立WebSocket连接
* @param ctx 上下文
* @return 错误信息
*/
func (wm *BinanceWebSocketManager) connect(ctx context.Context) error {
dialer, err := proxy.GetDialer(wm.proxyType, wm.proxyAddress)
if err != nil {
return err
}
listenKey, err := wm.getListenKey()
if err != nil {
return err
}
wm.listenKey = listenKey
url := fmt.Sprintf("%s/%s", wm.url, listenKey)
wm.ws, _, err = dialer.Dial(url, nil)
if err != nil {
return err
}
// 连接成功更新连接时间和ping时间
wm.ConnectTime = time.Now()
wm.lastPingTime.Store(time.Now())
log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey))
// Ping处理
wm.ws.SetPingHandler(func(appData string) error {
log.Info(fmt.Sprintf("收到 wstype: %v key:%s Ping 消息【%s】", wm.wsType, wm.apiKey, appData))
for x := 0; x < 5; x++ {
if err := wm.ws.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second*10)); err != nil {
log.Error("binance 回应pong失败 次数:", strconv.Itoa(x), " err:", err)
time.Sleep(time.Second * 1)
continue
}
break
}
// 更新ping时间和Redis记录
wm.lastPingTime.Store(time.Now())
setLastTime(wm)
return nil
})
// 启动各种协程
utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) })
utility.SafeGo(func() { wm.readMessages(ctx) })
utility.SafeGo(func() { wm.handleReconnect(ctx) })
utility.SafeGo(func() { wm.startPingLoop(ctx) })
utility.SafeGo(func() { wm.startDeadCheck(ctx) }) // 连接成功后立即启动假死检测
utility.SafeGo(func() { wm.start24HourReconnectTimer(ctx) }) // 启动24小时强制重连
return nil
}
// ReplaceConnection 创建新连接并关闭旧连接,实现无缝连接替换
func (wm *BinanceWebSocketManager) ReplaceConnection() error {
wm.mu.Lock()
if wm.isStopped {
wm.mu.Unlock()
return errors.New("WebSocket 已停止")
}
oldCtxCancel := wm.cancelFunc
oldConn := wm.ws
wm.mu.Unlock()
log.Infof("🔄 正在替换连接: %s", wm.apiKey)
// 步骤 1先获取新的 listenKey 和连接
newListenKey, err := wm.getListenKey()
if err != nil {
return fmt.Errorf("获取新 listenKey 失败: %w", err)
}
dialer, err := proxy.GetDialer(wm.proxyType, wm.proxyAddress)
if err != nil {
return err
}
newURL := fmt.Sprintf("%s/%s", wm.url, newListenKey)
newConn, _, err := dialer.Dial(newURL, nil)
if err != nil {
return fmt.Errorf("新连接 Dial 失败: %w", err)
}
// 步骤 2创建新的上下文并启动协程
newCtx, newCancel := context.WithCancel(context.Background())
// 设置 ping handler
newConn.SetPingHandler(func(appData string) error {
log.Infof("收到 Ping新连接 key:%s msg:%s", wm.apiKey, appData)
for x := 0; x < 5; x++ {
if err := newConn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(10*time.Second)); err != nil {
log.Errorf("Pong 失败 %d 次 err:%v", x, err)
time.Sleep(time.Second)
continue
}
break
}
setLastTime(wm)
return nil
})
// 步骤 3安全切换连接
wm.mu.Lock()
wm.ws = newConn
wm.listenKey = newListenKey
wm.ConnectTime = time.Now()
wm.cancelFunc = newCancel
wm.mu.Unlock()
log.Infof("✅ 替换连接成功: %s listenKey: %s", wm.apiKey, newListenKey)
// 步骤 4启动新连接协程
go wm.startListenKeyRenewal2(newCtx)
go wm.readMessages(newCtx)
go wm.handleReconnect(newCtx)
go wm.startPingLoop(newCtx)
// go wm.startDeadCheck(newCtx)
// 步骤 5关闭旧连接、取消旧协程
if oldCtxCancel != nil {
oldCtxCancel()
}
if oldConn != nil {
_ = oldConn.Close()
log.Infof("🔒 旧连接已关闭: %s", wm.apiKey)
}
return nil
}
// 更新最后通信时间
func setLastTime(wm *BinanceWebSocketManager) {
subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey)
val, _ := helper.DefaultRedis.GetString(subKey)
now := time.Now()
var data binancedto.UserSubscribeState
if val != "" {
sonic.Unmarshal([]byte(val), &data)
}
if wm.wsType == 0 {
data.SpotLastTime = &now
} else {
data.FuturesLastTime = &now
}
val, _ = sonic.MarshalString(&data)
if val != "" {
helper.DefaultRedis.SetString(subKey, val)
}
}
// 复用创建HTTP客户端的逻辑
func (wm *BinanceWebSocketManager) createBinanceClient() (*helper.BinanceClient, error) {
return helper.NewBinanceClient(wm.apiKey, wm.apiSecret, wm.proxyType, wm.proxyAddress)
}
// 获取listenKey
func (wm *BinanceWebSocketManager) getListenKey() (string, error) {
client, err := wm.createBinanceClient()
if err != nil {
return "", err
}
var resp []byte
switch wm.wsType {
case 0:
resp, _, err = client.SendSpotRequestByKey("/api/v3/userDataStream", "POST", nil)
case 1:
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "POST", nil)
default:
log.Error("链接类型错误")
return "", errors.New("链接类型错误")
}
if err != nil {
return "", err
}
var dataMap map[string]interface{}
if err := sonic.Unmarshal(resp, &dataMap); err != nil {
return "", err
}
if listenKey, ok := dataMap["listenKey"]; ok {
return listenKey.(string), nil
}
return "", errors.New("listenKey 不存在")
}
// 接收消息
func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
if wm.isStopped {
return
}
_, msg, err := wm.ws.ReadMessage()
if err != nil {
// 检查是否是由于我们主动关闭连接导致的错误
if wm.isStopped {
log.Infof("WebSocket read loop gracefully stopped for key: %s", wm.apiKey)
return
}
// 如果不是主动关闭,再判断是否是连接关闭错误,并触发重连
if strings.Contains(err.Error(), "websocket: close") {
log.Error("WebSocket connection closed unexpectedly, triggering reconnect for key: %s. Error: %v", wm.apiKey, err)
wm.triggerReconnect(false)
} else {
log.Error("Error reading message for key: %s. Error: %v", wm.apiKey, err)
}
return // 任何错误发生后都应退出读取循环
}
wm.handleOrderUpdate(msg)
}
}
}
func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) {
setLastTime(wm)
if reconnect, fatal, err := ReceiveListen(msg, wm.wsType, wm.apiKey); err != nil {
log.Errorf("处理消息时出错: %v", err)
// 根据错误类型决定是否停止
if fatal {
log.Errorf("收到致命错误,将停止 WebSocket 管理器 for key: %s", wm.apiKey)
wm.Stop()
}
return
} else if fatal {
log.Errorf("收到致命错误,将停止 WebSocket 管理器 for key: %s", wm.apiKey)
wm.Stop()
return
} else if reconnect {
log.Warnf("收到重连请求,将触发重连 for key: %s", wm.apiKey)
wm.triggerReconnect(false)
}
}
// closeConn 安全地关闭WebSocket连接
func (wm *BinanceWebSocketManager) closeConn() {
wm.mu.Lock()
defer wm.mu.Unlock()
if wm.ws != nil {
_ = wm.ws.Close()
wm.ws = nil
}
}
// 主动心跳发送机制
func (wm *BinanceWebSocketManager) startPingLoop(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if wm.isStopped {
return
}
if err := wm.sendPing(); err != nil {
log.Error("主动 Ping Binance 失败:", err)
wm.triggerReconnect(false)
return // 退出循环,等待重连
}
}
}
}
// 定期删除listenkey 并重启ws
// func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) {
// time.Sleep(30 * time.Minute)
// select {
// case <-ctx.Done():
// return
// default:
// if err := wm.deleteListenKey(listenKey); err != nil {
// log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
// } else {
// log.Debug("Successfully delete listenKey")
// wm.triggerReconnect()
// }
// }
// }
// 定时续期
func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) {
ticker := time.NewTicker(30 * time.Minute)
defer func() {
log.Debug("定时续期任务退出 key:", wm.apiKey)
ticker.Stop()
}()
for {
select {
case <-ticker.C:
if wm.isStopped {
return
}
if err := wm.renewListenKey(wm.listenKey); err != nil {
log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
}
case <-ctx.Done():
return
}
}
}
/*
删除listenkey
*/
// func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error {
// client, err := wm.createBinanceClient()
// if err != nil {
// return err
// }
// var resp []byte
// switch wm.wsType {
// case 0:
// path := fmt.Sprintf("/api/v3/userDataStream")
// params := map[string]interface{}{
// "listenKey": listenKey,
// }
// resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params)
// log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
// case 1:
// resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil)
// log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
// default:
// return errors.New("unknown ws type")
// }
// return err
// }
func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error {
client, err := wm.createBinanceClient()
if err != nil {
return err
}
var resp []byte
switch wm.wsType {
case 0:
path := fmt.Sprintf("/api/v3/userDataStream?listenKey=%s", listenKey)
resp, _, err = client.SendSpotRequestByKey(path, "PUT", nil)
log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
case 1:
// path := fmt.Sprintf("/fapi/v1/listenKey", listenKey)
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "PUT", nil)
log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
default:
return errors.New("unknown ws type")
}
return nil
}
func getWsTypeName(wsType int) string {
switch wsType {
case 0:
return "spot"
case 1:
return "futures"
default:
return "unknown"
}
}
func getUUID() string {
return fmt.Sprintf("%s-%s-%s-%s-%s", randomHex(8), randomHex(4), randomHex(4), randomHex(4), randomHex(12))
}
func randomHex(n int) string {
rand.New(rand.NewSource(time.Now().UnixNano()))
hexChars := "0123456789abcdef"
bytes := make([]byte, n)
for i := 0; i < n; i++ {
bytes[i] = hexChars[rand.Intn(len(hexChars))]
}
return string(bytes)
}
/**
* 启动24小时强制重连定时器
* @param ctx 上下文
*/
func (wm *BinanceWebSocketManager) start24HourReconnectTimer(ctx context.Context) {
// Binance要求不到24小时就需要主动断开重连
// 设置为23小时50分钟留出一些缓冲时间
duration := 23*time.Hour + 50*time.Minute
wm.forceReconnectTimer = time.NewTimer(duration)
defer func() {
if wm.forceReconnectTimer != nil {
wm.forceReconnectTimer.Stop()
}
}()
select {
case <-ctx.Done():
return
case <-wm.forceReconnectTimer.C:
if !wm.isStopped {
log.Warnf("24小时强制重连触发 key:%s wsType:%v", wm.apiKey, wm.wsType)
wm.triggerReconnect(true)
}
}
}
/**
* 改进的假死检测机制
* @param ctx 上下文
*/
func (wm *BinanceWebSocketManager) startDeadCheck(ctx context.Context) {
// 缩短检测间隔,提高响应速度
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if wm.isStopped {
return
}
wm.DeadCheck()
}
}
}
/**
* 改进的假死检测逻辑
*/
func (wm *BinanceWebSocketManager) DeadCheck() {
// 检查最后ping时间
lastPing := wm.lastPingTime.Load().(time.Time)
// 定义最大静默时间根据WebSocket类型调整
var timeout time.Duration
if wm.wsType == 0 {
timeout = 3 * time.Minute // 现货3分钟无ping视为假死
} else {
timeout = 6 * time.Minute // 合约6分钟无ping视为假死
}
if time.Since(lastPing) > timeout {
log.Warnf("检测到假死连接(基于ping时间) key:%s type:%v, 距离上次ping: %v, 触发重连",
wm.apiKey, wm.wsType, time.Since(lastPing))
wm.triggerReconnect(true)
return
}
// 同时检查Redis中的记录作为备用检测
subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey)
val, _ := helper.DefaultRedis.GetString(subKey)
if val == "" {
log.Debugf("没有订阅信息跳过Redis假死检测")
return
}
var data binancedto.UserSubscribeState
_ = sonic.Unmarshal([]byte(val), &data)
var lastTime *time.Time
if wm.wsType == 0 {
lastTime = data.SpotLastTime
} else {
lastTime = data.FuturesLastTime
}
// Redis记录的超时时间设置得更长一些
var redisTimeout time.Duration
if wm.wsType == 0 {
redisTimeout = 3 * time.Minute
} else {
redisTimeout = 6 * time.Minute
}
if lastTime != nil && time.Since(*lastTime) > redisTimeout {
log.Warnf("检测到假死连接(基于Redis记录) key:%s type:%v, 距离上次通信: %v, 触发重连",
wm.apiKey, wm.wsType, time.Since(*lastTime))
wm.triggerReconnect(true)
}
}
/**
* 优化的重连处理逻辑
* @param ctx 上下文
*/