1暂时提交

This commit is contained in:
2025-08-11 09:27:32 +08:00
parent 56a761e5ab
commit 4b28684fe4
16 changed files with 980 additions and 209 deletions

View File

@ -118,6 +118,7 @@ func (wm *BinanceWebSocketManager) triggerReconnect(force bool) {
case wm.reconnect <- struct{}{}:
default:
// 防止阻塞,如果通道满了就跳过
log.Debugf("reconnect 信号已存在,跳过 key:%s", wm.apiKey)
}
}
}
@ -488,6 +489,7 @@ func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) {
}
}
// Stop 安全停止 WebSocket
func (wm *BinanceWebSocketManager) Stop() {
wm.mu.Lock()
defer wm.mu.Unlock()
@ -495,9 +497,8 @@ func (wm *BinanceWebSocketManager) Stop() {
if wm.isStopped {
return
}
wm.isStopped = true
// 关闭 stopChannel确保已经关闭避免 panic
select {
case <-wm.stopChannel:
default:
@ -506,69 +507,106 @@ func (wm *BinanceWebSocketManager) Stop() {
if wm.cancelFunc != nil {
wm.cancelFunc()
wm.cancelFunc = nil
}
if wm.ws != nil {
if err := wm.ws.Close(); err != nil {
log.Error(fmt.Sprintf("key【%s】close失败", wm.apiKey), err)
} else {
log.Info(fmt.Sprintf("key【%s】close", wm.apiKey))
log.Errorf("WebSocket Close 错误 key:%s err:%v", wm.apiKey, err)
}
wm.ws = nil
}
// **重新创建 stopChannel避免 Restart() 时无效**
wm.stopChannel = make(chan struct{})
log.Infof("WebSocket 已完全停止 key:%s", wm.apiKey)
wm.stopChannel = make(chan struct{}, 10)
}
// 重连机制
// handleReconnect 使用指数退避并保持永不退出
func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) {
maxRetries := 100 // 最大重试次数
const maxRetries = 100
baseDelay := time.Second * 2
retryCount := 0
for {
select {
case <-ctx.Done():
log.Infof("handleReconnect context done: %s", wm.apiKey)
return
case <-wm.reconnect:
wm.mu.Lock()
if wm.isStopped {
wm.mu.Unlock()
return
}
wm.mu.Unlock()
log.Warn("WebSocket 连接断开,尝试重连...")
if wm.ws != nil {
wm.ws.Close()
}
// 取消旧的上下文
if wm.cancelFunc != nil {
wm.cancelFunc()
}
log.Warnf("WebSocket 连接断开,准备重连 key:%s", wm.apiKey)
for {
wm.mu.Lock()
if wm.ws != nil {
_ = wm.ws.Close()
wm.ws = nil
}
if wm.cancelFunc != nil {
wm.cancelFunc()
wm.cancelFunc = nil
}
wm.mu.Unlock()
newCtx, cancel := context.WithCancel(context.Background())
wm.cancelFunc = cancel // 更新 cancelFunc
wm.mu.Lock()
wm.cancelFunc = cancel
wm.mu.Unlock()
if err := wm.connect(newCtx); err != nil {
log.Errorf("重连失败: %v", err)
log.Errorf("🔌 重连失败%d/%dkey:%serr: %v", retryCount+1, maxRetries, wm.apiKey, err)
cancel()
retryCount++
if retryCount >= maxRetries {
log.Errorf("❌ 重连失败次数过多,停止重连逻辑 key:%s", wm.apiKey)
wm.reconnecting.Store(false)
log.Error("重连失败次数过多,退出重连逻辑")
return
}
time.Sleep(5 * time.Second)
delay := baseDelay * time.Duration(1<<retryCount)
if delay > time.Minute*5 {
delay = time.Minute * 5
}
log.Warnf("等待 %v 后重试...", delay)
time.Sleep(delay)
continue
}
// 重连成功,清除标记
wm.reconnecting.Store(false)
log.Infof("✅ 重连成功 key:%s", wm.apiKey)
retryCount = 0
wm.reconnecting.Store(false)
// ✅ 重连成功后开启假死检测
utility.SafeGo(func() { wm.startDeadCheck(newCtx) })
break
}
}
}
}
// startDeadCheck 替代 Start 中的定时器,绑定连接生命周期
func (wm *BinanceWebSocketManager) startDeadCheck(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if wm.isStopped {
return
}
wm.DeadCheck()
}
}
}