601 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			601 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								package excservice
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import (
							 | 
						||
| 
								 | 
							
									"context"
							 | 
						||
| 
								 | 
							
									"crypto/tls"
							 | 
						||
| 
								 | 
							
									"errors"
							 | 
						||
| 
								 | 
							
									"fmt"
							 | 
						||
| 
								 | 
							
									"go-admin/common/global"
							 | 
						||
| 
								 | 
							
									"go-admin/common/helper"
							 | 
						||
| 
								 | 
							
									"go-admin/models/binancedto"
							 | 
						||
| 
								 | 
							
									"go-admin/models/commondto"
							 | 
						||
| 
								 | 
							
									"go-admin/pkg/jsonhelper"
							 | 
						||
| 
								 | 
							
									"go-admin/pkg/utility"
							 | 
						||
| 
								 | 
							
									"math/rand"
							 | 
						||
| 
								 | 
							
									"net"
							 | 
						||
| 
								 | 
							
									"net/http"
							 | 
						||
| 
								 | 
							
									"net/url"
							 | 
						||
| 
								 | 
							
									"os"
							 | 
						||
| 
								 | 
							
									"os/signal"
							 | 
						||
| 
								 | 
							
									"strconv"
							 | 
						||
| 
								 | 
							
									"strings"
							 | 
						||
| 
								 | 
							
									"sync"
							 | 
						||
| 
								 | 
							
									"time"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									"github.com/bytedance/sonic"
							 | 
						||
| 
								 | 
							
									log "github.com/go-admin-team/go-admin-core/logger"
							 | 
						||
| 
								 | 
							
									"github.com/gorilla/websocket"
							 | 
						||
| 
								 | 
							
									"golang.org/x/net/proxy"
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								type BinanceWebSocketManager struct {
							 | 
						||
| 
								 | 
							
									ws          *websocket.Conn
							 | 
						||
| 
								 | 
							
									stopChannel chan struct{}
							 | 
						||
| 
								 | 
							
									url         string
							 | 
						||
| 
								 | 
							
									/* 0-现货 1-合约 */
							 | 
						||
| 
								 | 
							
									wsType       int
							 | 
						||
| 
								 | 
							
									apiKey       string
							 | 
						||
| 
								 | 
							
									apiSecret    string
							 | 
						||
| 
								 | 
							
									proxyType    string
							 | 
						||
| 
								 | 
							
									proxyAddress string
							 | 
						||
| 
								 | 
							
									reconnect    chan struct{}
							 | 
						||
| 
								 | 
							
									isStopped    bool       // 标记 WebSocket 是否已主动停止
							 | 
						||
| 
								 | 
							
									mu           sync.Mutex // 用于控制并发访问 isStopped
							 | 
						||
| 
								 | 
							
									cancelFunc   context.CancelFunc
							 | 
						||
| 
								 | 
							
									listenKey    string // 新增字段
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 已有连接
							 | 
						||
| 
								 | 
							
								var SpotSockets = map[string]*BinanceWebSocketManager{}
							 | 
						||
| 
								 | 
							
								var FutureSockets = map[string]*BinanceWebSocketManager{}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
							 | 
						||
| 
								 | 
							
									url := ""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									switch wsType {
							 | 
						||
| 
								 | 
							
									case 0:
							 | 
						||
| 
								 | 
							
										url = "wss://stream.binance.com:9443/ws"
							 | 
						||
| 
								 | 
							
									case 1:
							 | 
						||
| 
								 | 
							
										url = "wss://fstream.binance.com/ws"
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return &BinanceWebSocketManager{
							 | 
						||
| 
								 | 
							
										stopChannel:  make(chan struct{}, 10),
							 | 
						||
| 
								 | 
							
										reconnect:    make(chan struct{}, 10),
							 | 
						||
| 
								 | 
							
										isStopped:    false,
							 | 
						||
| 
								 | 
							
										url:          url,
							 | 
						||
| 
								 | 
							
										wsType:       wsType,
							 | 
						||
| 
								 | 
							
										apiKey:       apiKey,
							 | 
						||
| 
								 | 
							
										apiSecret:    apiSecret,
							 | 
						||
| 
								 | 
							
										proxyType:    proxyType,
							 | 
						||
| 
								 | 
							
										proxyAddress: proxyAddress,
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) Start() {
							 | 
						||
| 
								 | 
							
									utility.SafeGo(wm.run)
							 | 
						||
| 
								 | 
							
									// wm.run()
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 重启连接
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
							 | 
						||
| 
								 | 
							
									wm.apiKey = apiKey
							 | 
						||
| 
								 | 
							
									wm.apiSecret = apiSecret
							 | 
						||
| 
								 | 
							
									wm.proxyType = proxyType
							 | 
						||
| 
								 | 
							
									wm.proxyAddress = proxyAddress
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									wm.reconnect <- struct{}{}
							 | 
						||
| 
								 | 
							
									return wm
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func Restart(wm *BinanceWebSocketManager) {
							 | 
						||
| 
								 | 
							
									wm.reconnect <- struct{}{}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) run() {
							 | 
						||
| 
								 | 
							
									ctx, cancel := context.WithCancel(context.Background())
							 | 
						||
| 
								 | 
							
									wm.cancelFunc = cancel
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									utility.SafeGo(wm.handleSignal)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 计算错误记录键
							 | 
						||
| 
								 | 
							
									errKey := fmt.Sprintf(global.API_WEBSOCKET_ERR, wm.apiKey)
							 | 
						||
| 
								 | 
							
									errMessage := commondto.WebSocketErr{Time: time.Now()}
							 | 
						||
| 
								 | 
							
									helper.DefaultRedis.SetString(errKey, jsonhelper.ToJsonString(errMessage))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case <-ctx.Done():
							 | 
						||
| 
								 | 
							
											return
							 | 
						||
| 
								 | 
							
										default:
							 | 
						||
| 
								 | 
							
											if err := wm.connect(ctx); err != nil {
							 | 
						||
| 
								 | 
							
												wm.handleConnectionError(errKey, err)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												if wm.isErrorCountExceeded(errKey) {
							 | 
						||
| 
								 | 
							
													log.Error("连接 %s WebSocket 时出错次数过多,停止 WebSocket 管理器: %v", wm.wsType, wm.apiKey)
							 | 
						||
| 
								 | 
							
													wm.Stop()
							 | 
						||
| 
								 | 
							
													return
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												time.Sleep(5 * time.Second)
							 | 
						||
| 
								 | 
							
												continue
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											<-wm.stopChannel
							 | 
						||
| 
								 | 
							
											log.Info("停止 %s WebSocket 管理器...", getWsTypeName(wm.wsType))
							 | 
						||
| 
								 | 
							
											wm.Stop()
							 | 
						||
| 
								 | 
							
											return
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// handleConnectionError 处理 WebSocket 连接错误
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) handleConnectionError(errKey string, err error) {
							 | 
						||
| 
								 | 
							
									// 从 Redis 获取错误记录
							 | 
						||
| 
								 | 
							
									var errMessage commondto.WebSocketErr
							 | 
						||
| 
								 | 
							
									val, _ := helper.DefaultRedis.GetString(errKey)
							 | 
						||
| 
								 | 
							
									if val != "" {
							 | 
						||
| 
								 | 
							
										sonic.UnmarshalString(val, &errMessage)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 更新错误记录
							 | 
						||
| 
								 | 
							
									errMessage.Count++
							 | 
						||
| 
								 | 
							
									errMessage.Time = time.Now()
							 | 
						||
| 
								 | 
							
									errMessage.ErrorMessage = err.Error()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 将错误记录保存到 Redis
							 | 
						||
| 
								 | 
							
									if data, err := sonic.MarshalString(errMessage); err == nil {
							 | 
						||
| 
								 | 
							
										helper.DefaultRedis.SetString(errKey, data)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 记录错误日志
							 | 
						||
| 
								 | 
							
									log.Error("连接 %s WebSocket 时出错: %v, 错误: %v", wm.wsType, wm.apiKey, err)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// isErrorCountExceeded 检查错误次数是否超过阈值
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) isErrorCountExceeded(errKey string) bool {
							 | 
						||
| 
								 | 
							
									val, _ := helper.DefaultRedis.GetString(errKey)
							 | 
						||
| 
								 | 
							
									if val == "" {
							 | 
						||
| 
								 | 
							
										return false
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									var errMessage commondto.WebSocketErr
							 | 
						||
| 
								 | 
							
									if err := sonic.UnmarshalString(val, &errMessage); err != nil {
							 | 
						||
| 
								 | 
							
										return false
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return errMessage.Count >= 5
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 处理终止信号
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) handleSignal() {
							 | 
						||
| 
								 | 
							
									ch := make(chan os.Signal)
							 | 
						||
| 
								 | 
							
									signal.Notify(ch, os.Interrupt)
							 | 
						||
| 
								 | 
							
									<-ch
							 | 
						||
| 
								 | 
							
									wm.Stop()
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) connect(ctx context.Context) error {
							 | 
						||
| 
								 | 
							
									dialer, err := wm.getDialer()
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									listenKey, err := wm.getListenKey()
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									wm.listenKey = listenKey
							 | 
						||
| 
								 | 
							
									url := fmt.Sprintf("%s/%s", wm.url, listenKey)
							 | 
						||
| 
								 | 
							
									wm.ws, _, err = dialer.Dial(url, nil)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// Ping处理
							 | 
						||
| 
								 | 
							
									wm.ws.SetPingHandler(func(appData string) error {
							 | 
						||
| 
								 | 
							
										log.Info(fmt.Sprintf("收到 wstype: %v key:%s Ping 消息【%s】", wm.wsType, wm.apiKey, appData))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										for x := 0; x < 5; x++ {
							 | 
						||
| 
								 | 
							
											if err := wm.ws.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second*10)); err != nil {
							 | 
						||
| 
								 | 
							
												log.Error("binance 回应pong失败 次数:", strconv.Itoa(x), " err:", err)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												time.Sleep(time.Second * 1)
							 | 
						||
| 
								 | 
							
												continue
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											break
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										setLastTime(wm)
							 | 
						||
| 
								 | 
							
										return nil
							 | 
						||
| 
								 | 
							
									})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// utility.SafeGoParam(wm.restartConnect, ctx)
							 | 
						||
| 
								 | 
							
									utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) })
							 | 
						||
| 
								 | 
							
									utility.SafeGo(func() { wm.readMessages(ctx) })
							 | 
						||
| 
								 | 
							
									utility.SafeGo(func() { wm.handleReconnect(ctx) })
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 更新最后通信时间
							 | 
						||
| 
								 | 
							
								func setLastTime(wm *BinanceWebSocketManager) {
							 | 
						||
| 
								 | 
							
									subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey)
							 | 
						||
| 
								 | 
							
									val, _ := helper.DefaultRedis.GetString(subKey)
							 | 
						||
| 
								 | 
							
									now := time.Now()
							 | 
						||
| 
								 | 
							
									var data binancedto.UserSubscribeState
							 | 
						||
| 
								 | 
							
									if val != "" {
							 | 
						||
| 
								 | 
							
										sonic.Unmarshal([]byte(val), &data)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if wm.wsType == 0 {
							 | 
						||
| 
								 | 
							
										data.SpotLastTime = &now
							 | 
						||
| 
								 | 
							
									} else {
							 | 
						||
| 
								 | 
							
										data.FuturesLastTime = &now
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									val, _ = sonic.MarshalString(&data)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if val != "" {
							 | 
						||
| 
								 | 
							
										helper.DefaultRedis.SetString(subKey, val)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) getDialer() (*websocket.Dialer, error) {
							 | 
						||
| 
								 | 
							
									if wm.proxyAddress == "" {
							 | 
						||
| 
								 | 
							
										return &websocket.Dialer{}, nil
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if !strings.HasPrefix(wm.proxyAddress, "http://") && !strings.HasPrefix(wm.proxyAddress, "https://") && !strings.HasPrefix(wm.proxyAddress, "socks5://") {
							 | 
						||
| 
								 | 
							
										wm.proxyAddress = wm.proxyType + "://" + wm.proxyAddress
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									proxyURL, err := url.Parse(wm.proxyAddress)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil, fmt.Errorf("failed to parse proxy URL: %v", err)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									transport := &http.Transport{
							 | 
						||
| 
								 | 
							
										TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									switch proxyURL.Scheme {
							 | 
						||
| 
								 | 
							
									case "socks5":
							 | 
						||
| 
								 | 
							
										return wm.createSocks5Dialer(proxyURL)
							 | 
						||
| 
								 | 
							
									case "http", "https":
							 | 
						||
| 
								 | 
							
										transport.Proxy = http.ProxyURL(proxyURL)
							 | 
						||
| 
								 | 
							
										return &websocket.Dialer{Proxy: transport.Proxy, TLSClientConfig: transport.TLSClientConfig}, nil
							 | 
						||
| 
								 | 
							
									default:
							 | 
						||
| 
								 | 
							
										return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) createSocks5Dialer(proxyURL *url.URL) (*websocket.Dialer, error) {
							 | 
						||
| 
								 | 
							
									auth := &proxy.Auth{}
							 | 
						||
| 
								 | 
							
									if proxyURL.User != nil {
							 | 
						||
| 
								 | 
							
										auth.User = proxyURL.User.Username()
							 | 
						||
| 
								 | 
							
										auth.Password, _ = proxyURL.User.Password()
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									socksDialer, err := proxy.SOCKS5("tcp", proxyURL.Host, auth, proxy.Direct)
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return nil, fmt.Errorf("failed to create SOCKS5 proxy dialer: %v", err)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return &websocket.Dialer{
							 | 
						||
| 
								 | 
							
										NetDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
							 | 
						||
| 
								 | 
							
											return socksDialer.Dial(network, address)
							 | 
						||
| 
								 | 
							
										},
							 | 
						||
| 
								 | 
							
									}, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 复用创建HTTP客户端的逻辑
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) createBinanceClient() (*helper.BinanceClient, error) {
							 | 
						||
| 
								 | 
							
									return helper.NewBinanceClient(wm.apiKey, wm.apiSecret, wm.proxyType, wm.proxyAddress)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 获取listenKey
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) getListenKey() (string, error) {
							 | 
						||
| 
								 | 
							
									client, err := wm.createBinanceClient()
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return "", err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									var resp []byte
							 | 
						||
| 
								 | 
							
									switch wm.wsType {
							 | 
						||
| 
								 | 
							
									case 0:
							 | 
						||
| 
								 | 
							
										resp, _, err = client.SendSpotRequestByKey("/api/v3/userDataStream", "POST", nil)
							 | 
						||
| 
								 | 
							
									case 1:
							 | 
						||
| 
								 | 
							
										resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "POST", nil)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									default:
							 | 
						||
| 
								 | 
							
										log.Error("链接类型错误")
							 | 
						||
| 
								 | 
							
										return "", errors.New("链接类型错误")
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return "", err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									var dataMap map[string]interface{}
							 | 
						||
| 
								 | 
							
									if err := sonic.Unmarshal(resp, &dataMap); err != nil {
							 | 
						||
| 
								 | 
							
										return "", err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if listenKey, ok := dataMap["listenKey"]; ok {
							 | 
						||
| 
								 | 
							
										return listenKey.(string), nil
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return "", errors.New("listenKey 不存在")
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 接收消息
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) {
							 | 
						||
| 
								 | 
							
									defer wm.ws.Close()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case <-ctx.Done():
							 | 
						||
| 
								 | 
							
											return
							 | 
						||
| 
								 | 
							
										default:
							 | 
						||
| 
								 | 
							
											if wm.isStopped {
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											_, msg, err := wm.ws.ReadMessage()
							 | 
						||
| 
								 | 
							
											if err != nil && strings.Contains(err.Error(), "websocket: close") {
							 | 
						||
| 
								 | 
							
												if !wm.isStopped {
							 | 
						||
| 
								 | 
							
													wm.reconnect <- struct{}{}
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												log.Error("websocket 关闭")
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											} else if err != nil {
							 | 
						||
| 
								 | 
							
												log.Error("读取消息时出错: %v", err)
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											wm.handleOrderUpdate(msg)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) {
							 | 
						||
| 
								 | 
							
									setLastTime(wm)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if reconnect, _ := ReceiveListen(msg, wm.wsType); reconnect {
							 | 
						||
| 
								 | 
							
										wm.reconnect <- struct{}{}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) Stop() {
							 | 
						||
| 
								 | 
							
									wm.mu.Lock()
							 | 
						||
| 
								 | 
							
									defer wm.mu.Unlock()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if wm.isStopped {
							 | 
						||
| 
								 | 
							
										return
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									wm.isStopped = true
							 | 
						||
| 
								 | 
							
									close(wm.stopChannel)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if wm.cancelFunc != nil {
							 | 
						||
| 
								 | 
							
										wm.cancelFunc()
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									if wm.ws != nil {
							 | 
						||
| 
								 | 
							
										if err := wm.ws.Close(); err != nil {
							 | 
						||
| 
								 | 
							
											log.Error(fmt.Sprintf("key【%s】close失败", wm.apiKey), err)
							 | 
						||
| 
								 | 
							
										} else {
							 | 
						||
| 
								 | 
							
											log.Info(fmt.Sprintf("key【%s】close", wm.apiKey))
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 重连机制
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) {
							 | 
						||
| 
								 | 
							
									maxRetries := 5 // 最大重试次数
							 | 
						||
| 
								 | 
							
									retryCount := 0
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case <-ctx.Done():
							 | 
						||
| 
								 | 
							
											return
							 | 
						||
| 
								 | 
							
										case <-wm.reconnect:
							 | 
						||
| 
								 | 
							
											if wm.isStopped {
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											log.Warn("WebSocket 连接断开,尝试重连...")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											if wm.ws != nil {
							 | 
						||
| 
								 | 
							
												wm.ws.Close()
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											// 取消旧的上下文
							 | 
						||
| 
								 | 
							
											if wm.cancelFunc != nil {
							 | 
						||
| 
								 | 
							
												wm.cancelFunc()
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											for {
							 | 
						||
| 
								 | 
							
												newCtx, cancel := context.WithCancel(context.Background())
							 | 
						||
| 
								 | 
							
												wm.cancelFunc = cancel // 更新 cancelFunc
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												if err := wm.connect(newCtx); err != nil {
							 | 
						||
| 
								 | 
							
													log.Errorf("重连失败: %v", err)
							 | 
						||
| 
								 | 
							
													cancel()
							 | 
						||
| 
								 | 
							
													retryCount++
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
													if retryCount >= maxRetries {
							 | 
						||
| 
								 | 
							
														log.Error("重连失败次数过多,退出重连逻辑")
							 | 
						||
| 
								 | 
							
														return
							 | 
						||
| 
								 | 
							
													}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
													time.Sleep(5 * time.Second)
							 | 
						||
| 
								 | 
							
													continue
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 定期删除listenkey 并重启ws
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) {
							 | 
						||
| 
								 | 
							
									time.Sleep(30 * time.Minute)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									select {
							 | 
						||
| 
								 | 
							
									case <-ctx.Done():
							 | 
						||
| 
								 | 
							
										return
							 | 
						||
| 
								 | 
							
									default:
							 | 
						||
| 
								 | 
							
										if err := wm.deleteListenKey(listenKey); err != nil {
							 | 
						||
| 
								 | 
							
											log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
							 | 
						||
| 
								 | 
							
										} else {
							 | 
						||
| 
								 | 
							
											log.Debug("Successfully delete listenKey")
							 | 
						||
| 
								 | 
							
											wm.reconnect <- struct{}{}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// ticker := time.NewTicker(5 * time.Minute)
							 | 
						||
| 
								 | 
							
									// defer ticker.Stop()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// for {
							 | 
						||
| 
								 | 
							
									// 	select {
							 | 
						||
| 
								 | 
							
									// 	case <-ticker.C:
							 | 
						||
| 
								 | 
							
									// 		if wm.isStopped {
							 | 
						||
| 
								 | 
							
									// 			return
							 | 
						||
| 
								 | 
							
									// 		}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// 		if err := wm.deleteListenKey(listenKey); err != nil {
							 | 
						||
| 
								 | 
							
									// 			log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
							 | 
						||
| 
								 | 
							
									// 		} else {
							 | 
						||
| 
								 | 
							
									// 			log.Debug("Successfully delete listenKey")
							 | 
						||
| 
								 | 
							
									// 			wm.reconnect <- struct{}{}
							 | 
						||
| 
								 | 
							
									// 			return
							 | 
						||
| 
								 | 
							
									// 		}
							 | 
						||
| 
								 | 
							
									// 	case <-ctx.Done():
							 | 
						||
| 
								 | 
							
									// 		return
							 | 
						||
| 
								 | 
							
									// 	}
							 | 
						||
| 
								 | 
							
									// }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// 定时续期
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) {
							 | 
						||
| 
								 | 
							
									ticker := time.NewTicker(30 * time.Minute)
							 | 
						||
| 
								 | 
							
									defer func() {
							 | 
						||
| 
								 | 
							
										log.Debug("定时续期任务退出 key:", wm.apiKey)
							 | 
						||
| 
								 | 
							
										ticker.Stop()
							 | 
						||
| 
								 | 
							
									}()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case <-ticker.C:
							 | 
						||
| 
								 | 
							
											if wm.isStopped {
							 | 
						||
| 
								 | 
							
												return
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											if err := wm.renewListenKey(wm.listenKey); err != nil {
							 | 
						||
| 
								 | 
							
												log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										case <-ctx.Done():
							 | 
						||
| 
								 | 
							
											return
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								/*
							 | 
						||
| 
								 | 
							
								删除listenkey
							 | 
						||
| 
								 | 
							
								*/
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error {
							 | 
						||
| 
								 | 
							
									client, err := wm.createBinanceClient()
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									var resp []byte
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									switch wm.wsType {
							 | 
						||
| 
								 | 
							
									case 0:
							 | 
						||
| 
								 | 
							
										path := fmt.Sprintf("/api/v3/userDataStream")
							 | 
						||
| 
								 | 
							
										params := map[string]interface{}{
							 | 
						||
| 
								 | 
							
											"listenKey": listenKey,
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
							 | 
						||
| 
								 | 
							
									case 1:
							 | 
						||
| 
								 | 
							
										resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil)
							 | 
						||
| 
								 | 
							
										log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
							 | 
						||
| 
								 | 
							
									default:
							 | 
						||
| 
								 | 
							
										return errors.New("unknown ws type")
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return err
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error {
							 | 
						||
| 
								 | 
							
									// payloadParam := map[string]interface{}{
							 | 
						||
| 
								 | 
							
									// 	"listenKey": listenKey,
							 | 
						||
| 
								 | 
							
									// 	"apiKey":    wm.apiKey,
							 | 
						||
| 
								 | 
							
									// }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// params := map[string]interface{}{
							 | 
						||
| 
								 | 
							
									// 	"id":     getUUID(),
							 | 
						||
| 
								 | 
							
									// 	"method": "userDataStream.ping",
							 | 
						||
| 
								 | 
							
									// 	"params": payloadParam,
							 | 
						||
| 
								 | 
							
									// }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// if err := wm.ws.WriteJSON(params); err != nil {
							 | 
						||
| 
								 | 
							
									// 	return err
							 | 
						||
| 
								 | 
							
									// }
							 | 
						||
| 
								 | 
							
									// wm.ws.WriteJSON()
							 | 
						||
| 
								 | 
							
									client, err := wm.createBinanceClient()
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										return err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									var resp []byte
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									switch wm.wsType {
							 | 
						||
| 
								 | 
							
									case 0:
							 | 
						||
| 
								 | 
							
										path := fmt.Sprintf("/api/v3/userDataStream?listenKey=%s", listenKey)
							 | 
						||
| 
								 | 
							
										resp, _, err = client.SendSpotRequestByKey(path, "PUT", nil)
							 | 
						||
| 
								 | 
							
										log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
							 | 
						||
| 
								 | 
							
									case 1:
							 | 
						||
| 
								 | 
							
										// path := fmt.Sprintf("/fapi/v1/listenKey", listenKey)
							 | 
						||
| 
								 | 
							
										resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "PUT", nil)
							 | 
						||
| 
								 | 
							
										log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
							 | 
						||
| 
								 | 
							
									default:
							 | 
						||
| 
								 | 
							
										return errors.New("unknown ws type")
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func getWsTypeName(wsType int) string {
							 | 
						||
| 
								 | 
							
									switch wsType {
							 | 
						||
| 
								 | 
							
									case 0:
							 | 
						||
| 
								 | 
							
										return "spot"
							 | 
						||
| 
								 | 
							
									case 1:
							 | 
						||
| 
								 | 
							
										return "futures"
							 | 
						||
| 
								 | 
							
									default:
							 | 
						||
| 
								 | 
							
										return "unknown"
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								func getUUID() string {
							 | 
						||
| 
								 | 
							
									return fmt.Sprintf("%s-%s-%s-%s-%s", randomHex(8), randomHex(4), randomHex(4), randomHex(4), randomHex(12))
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func randomHex(n int) string {
							 | 
						||
| 
								 | 
							
									rand.New(rand.NewSource(time.Now().UnixNano()))
							 | 
						||
| 
								 | 
							
									hexChars := "0123456789abcdef"
							 | 
						||
| 
								 | 
							
									bytes := make([]byte, n)
							 | 
						||
| 
								 | 
							
									for i := 0; i < n; i++ {
							 | 
						||
| 
								 | 
							
										bytes[i] = hexChars[rand.Intn(len(hexChars))]
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return string(bytes)
							 | 
						||
| 
								 | 
							
								}
							 |