Files
exchange_go/services/excservice/binancesocketmanager.go
2025-08-18 15:10:37 +08:00

619 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package excservice
import (
"context"
"crypto/tls"
"errors"
"fmt"
"go-admin/common/global"
"go-admin/common/helper"
"go-admin/models/binancedto"
"go-admin/models/commondto"
"go-admin/pkg/jsonhelper"
"go-admin/pkg/utility"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"time"
"github.com/bytedance/sonic"
log "github.com/go-admin-team/go-admin-core/logger"
"github.com/gorilla/websocket"
"golang.org/x/net/proxy"
)
type BinanceWebSocketManager struct {
ws *websocket.Conn
stopChannel chan struct{}
url string
/* 0-现货 1-合约 */
wsType int
apiKey string
apiSecret string
proxyType string
proxyAddress string
reconnect chan struct{}
isStopped bool // 标记 WebSocket 是否已主动停止
mu sync.Mutex // 用于控制并发访问 isStopped
cancelFunc context.CancelFunc
listenKey string // 新增字段
}
// 已有连接
var SpotSockets = map[string]*BinanceWebSocketManager{}
var FutureSockets = map[string]*BinanceWebSocketManager{}
func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
url := ""
switch wsType {
case 0:
url = "wss://stream.binance.com:9443/ws"
case 1:
url = "wss://fstream.binance.com/ws"
}
return &BinanceWebSocketManager{
stopChannel: make(chan struct{}, 10),
reconnect: make(chan struct{}, 10),
isStopped: false,
url: url,
wsType: wsType,
apiKey: apiKey,
apiSecret: apiSecret,
proxyType: proxyType,
proxyAddress: proxyAddress,
}
}
func (wm *BinanceWebSocketManager) Start() {
utility.SafeGo(wm.run)
// wm.run()
}
// 重启连接
func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
wm.mu.Lock()
defer wm.mu.Unlock()
wm.apiKey = apiKey
wm.apiSecret = apiSecret
wm.proxyType = proxyType
wm.proxyAddress = proxyAddress
if wm.isStopped {
wm.isStopped = false
utility.SafeGo(wm.run)
} else {
wm.reconnect <- struct{}{}
}
return wm
}
func Restart(wm *BinanceWebSocketManager) {
wm.reconnect <- struct{}{}
}
func (wm *BinanceWebSocketManager) run() {
ctx, cancel := context.WithCancel(context.Background())
wm.cancelFunc = cancel
utility.SafeGo(wm.handleSignal)
// 计算错误记录键
errKey := fmt.Sprintf(global.API_WEBSOCKET_ERR, wm.apiKey)
errMessage := commondto.WebSocketErr{Time: time.Now()}
helper.DefaultRedis.SetString(errKey, jsonhelper.ToJsonString(errMessage))
for {
select {
case <-ctx.Done():
return
default:
if err := wm.connect(ctx); err != nil {
wm.handleConnectionError(errKey, err)
if wm.isErrorCountExceeded(errKey) {
log.Error("连接 %s WebSocket 时出错次数过多,停止 WebSocket 管理器: %v", wm.wsType, wm.apiKey)
wm.Stop()
return
}
time.Sleep(5 * time.Second)
continue
}
<-wm.stopChannel
log.Info("停止 %s WebSocket 管理器...", getWsTypeName(wm.wsType))
wm.Stop()
return
}
}
}
// handleConnectionError 处理 WebSocket 连接错误
func (wm *BinanceWebSocketManager) handleConnectionError(errKey string, err error) {
// 从 Redis 获取错误记录
var errMessage commondto.WebSocketErr
val, _ := helper.DefaultRedis.GetString(errKey)
if val != "" {
sonic.UnmarshalString(val, &errMessage)
}
// 更新错误记录
errMessage.Count++
errMessage.Time = time.Now()
errMessage.ErrorMessage = err.Error()
// 将错误记录保存到 Redis
if data, err := sonic.MarshalString(errMessage); err == nil {
helper.DefaultRedis.SetString(errKey, data)
}
// 记录错误日志
log.Error("连接 %s WebSocket 时出错: %v, 错误: %v", wm.wsType, wm.apiKey, err)
}
// isErrorCountExceeded 检查错误次数是否超过阈值
func (wm *BinanceWebSocketManager) isErrorCountExceeded(errKey string) bool {
val, _ := helper.DefaultRedis.GetString(errKey)
if val == "" {
return false
}
var errMessage commondto.WebSocketErr
if err := sonic.UnmarshalString(val, &errMessage); err != nil {
return false
}
return errMessage.Count >= 5
}
// 处理终止信号
func (wm *BinanceWebSocketManager) handleSignal() {
ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt)
<-ch
wm.Stop()
}
func (wm *BinanceWebSocketManager) connect(ctx context.Context) error {
dialer, err := wm.getDialer()
if err != nil {
return err
}
listenKey, err := wm.getListenKey()
if err != nil {
return err
}
wm.listenKey = listenKey
url := fmt.Sprintf("%s/%s", wm.url, listenKey)
wm.ws, _, err = dialer.Dial(url, nil)
if err != nil {
return err
}
log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey))
// Ping处理
wm.ws.SetPingHandler(func(appData string) error {
log.Info(fmt.Sprintf("收到 wstype: %v key:%s Ping 消息【%s】", wm.wsType, wm.apiKey, appData))
for x := 0; x < 5; x++ {
if err := wm.ws.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second*10)); err != nil {
log.Error("binance 回应pong失败 次数:", strconv.Itoa(x), " err:", err)
time.Sleep(time.Second * 1)
continue
}
break
}
setLastTime(wm)
return nil
})
// utility.SafeGoParam(wm.restartConnect, ctx)
utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) })
utility.SafeGo(func() { wm.readMessages(ctx) })
utility.SafeGo(func() { wm.handleReconnect(ctx) })
return nil
}
// 更新最后通信时间
func setLastTime(wm *BinanceWebSocketManager) {
subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey)
val, _ := helper.DefaultRedis.GetString(subKey)
now := time.Now()
var data binancedto.UserSubscribeState
if val != "" {
sonic.Unmarshal([]byte(val), &data)
}
if wm.wsType == 0 {
data.SpotLastTime = &now
} else {
data.FuturesLastTime = &now
}
val, _ = sonic.MarshalString(&data)
if val != "" {
helper.DefaultRedis.SetString(subKey, val)
}
}
func (wm *BinanceWebSocketManager) getDialer() (*websocket.Dialer, error) {
if wm.proxyAddress == "" {
return &websocket.Dialer{}, nil
}
if !strings.HasPrefix(wm.proxyAddress, "http://") && !strings.HasPrefix(wm.proxyAddress, "https://") && !strings.HasPrefix(wm.proxyAddress, "socks5://") {
wm.proxyAddress = wm.proxyType + "://" + wm.proxyAddress
}
proxyURL, err := url.Parse(wm.proxyAddress)
if err != nil {
return nil, fmt.Errorf("failed to parse proxy URL: %v", err)
}
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
}
switch proxyURL.Scheme {
case "socks5":
return wm.createSocks5Dialer(proxyURL)
case "http", "https":
transport.Proxy = http.ProxyURL(proxyURL)
return &websocket.Dialer{Proxy: transport.Proxy, TLSClientConfig: transport.TLSClientConfig}, nil
default:
return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme)
}
}
func (wm *BinanceWebSocketManager) createSocks5Dialer(proxyURL *url.URL) (*websocket.Dialer, error) {
auth := &proxy.Auth{}
if proxyURL.User != nil {
auth.User = proxyURL.User.Username()
auth.Password, _ = proxyURL.User.Password()
}
socksDialer, err := proxy.SOCKS5("tcp", proxyURL.Host, auth, proxy.Direct)
if err != nil {
return nil, fmt.Errorf("failed to create SOCKS5 proxy dialer: %v", err)
}
return &websocket.Dialer{
NetDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
return socksDialer.Dial(network, address)
},
}, nil
}
// 复用创建HTTP客户端的逻辑
func (wm *BinanceWebSocketManager) createBinanceClient() (*helper.BinanceClient, error) {
return helper.NewBinanceClient(wm.apiKey, wm.apiSecret, wm.proxyType, wm.proxyAddress)
}
// 获取listenKey
func (wm *BinanceWebSocketManager) getListenKey() (string, error) {
client, err := wm.createBinanceClient()
if err != nil {
return "", err
}
var resp []byte
switch wm.wsType {
case 0:
resp, _, err = client.SendSpotRequestByKey("/api/v3/userDataStream", "POST", nil)
case 1:
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "POST", nil)
default:
log.Error("链接类型错误")
return "", errors.New("链接类型错误")
}
if err != nil {
return "", err
}
var dataMap map[string]interface{}
if err := sonic.Unmarshal(resp, &dataMap); err != nil {
return "", err
}
if listenKey, ok := dataMap["listenKey"]; ok {
return listenKey.(string), nil
}
return "", errors.New("listenKey 不存在")
}
// 接收消息
func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) {
defer wm.ws.Close()
for {
select {
case <-ctx.Done():
return
default:
if wm.isStopped {
return
}
_, msg, err := wm.ws.ReadMessage()
if err != nil && strings.Contains(err.Error(), "websocket: close") {
if !wm.isStopped {
wm.reconnect <- struct{}{}
}
log.Error("websocket 关闭")
return
} else if err != nil {
log.Error("读取消息时出错: %v", err)
return
}
wm.handleOrderUpdate(msg)
}
}
}
func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) {
setLastTime(wm)
if reconnect, _ := ReceiveListen(msg, wm.wsType); reconnect {
wm.reconnect <- struct{}{}
}
}
func (wm *BinanceWebSocketManager) Stop() {
wm.mu.Lock()
defer wm.mu.Unlock()
if wm.isStopped {
return
}
wm.isStopped = true
// 关闭 stopChannel确保已经关闭避免 panic
select {
case <-wm.stopChannel:
default:
close(wm.stopChannel)
}
if wm.cancelFunc != nil {
wm.cancelFunc()
}
if wm.ws != nil {
if err := wm.ws.Close(); err != nil {
log.Error(fmt.Sprintf("key【%s】close失败", wm.apiKey), err)
} else {
log.Info(fmt.Sprintf("key【%s】close", wm.apiKey))
}
}
// **重新创建 stopChannel避免 Restart() 时无效**
wm.stopChannel = make(chan struct{})
}
// 重连机制
func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) {
maxRetries := 100 // 最大重试次数
retryCount := 0
for {
select {
case <-ctx.Done():
return
case <-wm.reconnect:
if wm.isStopped {
return
}
log.Warn("WebSocket 连接断开,尝试重连...")
if wm.ws != nil {
wm.ws.Close()
}
// 取消旧的上下文
if wm.cancelFunc != nil {
wm.cancelFunc()
}
for {
newCtx, cancel := context.WithCancel(context.Background())
wm.cancelFunc = cancel // 更新 cancelFunc
if err := wm.connect(newCtx); err != nil {
log.Errorf("重连失败: %v", err)
cancel()
retryCount++
if retryCount >= maxRetries {
log.Error("重连失败次数过多,退出重连逻辑")
return
}
time.Sleep(5 * time.Second)
continue
}
retryCount = 0
return
}
}
}
}
// 定期删除listenkey 并重启ws
func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) {
time.Sleep(30 * time.Minute)
select {
case <-ctx.Done():
return
default:
if err := wm.deleteListenKey(listenKey); err != nil {
log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
} else {
log.Debug("Successfully delete listenKey")
wm.reconnect <- struct{}{}
}
}
// ticker := time.NewTicker(5 * time.Minute)
// defer ticker.Stop()
// for {
// select {
// case <-ticker.C:
// if wm.isStopped {
// return
// }
// if err := wm.deleteListenKey(listenKey); err != nil {
// log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
// } else {
// log.Debug("Successfully delete listenKey")
// wm.reconnect <- struct{}{}
// return
// }
// case <-ctx.Done():
// return
// }
// }
}
// 定时续期
func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) {
ticker := time.NewTicker(30 * time.Minute)
defer func() {
log.Debug("定时续期任务退出 key:", wm.apiKey)
ticker.Stop()
}()
for {
select {
case <-ticker.C:
if wm.isStopped {
return
}
if err := wm.renewListenKey(wm.listenKey); err != nil {
log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
}
case <-ctx.Done():
return
}
}
}
/*
删除listenkey
*/
func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error {
client, err := wm.createBinanceClient()
if err != nil {
return err
}
var resp []byte
switch wm.wsType {
case 0:
path := fmt.Sprintf("/api/v3/userDataStream")
params := map[string]interface{}{
"listenKey": listenKey,
}
resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params)
log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
case 1:
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil)
log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
default:
return errors.New("unknown ws type")
}
return err
}
func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error {
// payloadParam := map[string]interface{}{
// "listenKey": listenKey,
// "apiKey": wm.apiKey,
// }
// params := map[string]interface{}{
// "id": getUUID(),
// "method": "userDataStream.ping",
// "params": payloadParam,
// }
// if err := wm.ws.WriteJSON(params); err != nil {
// return err
// }
// wm.ws.WriteJSON()
client, err := wm.createBinanceClient()
if err != nil {
return err
}
var resp []byte
switch wm.wsType {
case 0:
path := fmt.Sprintf("/api/v3/userDataStream?listenKey=%s", listenKey)
resp, _, err = client.SendSpotRequestByKey(path, "PUT", nil)
log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
case 1:
// path := fmt.Sprintf("/fapi/v1/listenKey", listenKey)
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "PUT", nil)
log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
default:
return errors.New("unknown ws type")
}
return nil
}
func getWsTypeName(wsType int) string {
switch wsType {
case 0:
return "spot"
case 1:
return "futures"
default:
return "unknown"
}
}
func getUUID() string {
return fmt.Sprintf("%s-%s-%s-%s-%s", randomHex(8), randomHex(4), randomHex(4), randomHex(4), randomHex(12))
}
func randomHex(n int) string {
rand.New(rand.NewSource(time.Now().UnixNano()))
hexChars := "0123456789abcdef"
bytes := make([]byte, n)
for i := 0; i < n; i++ {
bytes[i] = hexChars[rand.Intn(len(hexChars))]
}
return string(bytes)
}