618 lines
14 KiB
Go
618 lines
14 KiB
Go
package excservice
|
||
|
||
import (
|
||
"context"
|
||
"crypto/tls"
|
||
"errors"
|
||
"fmt"
|
||
"go-admin/common/global"
|
||
"go-admin/common/helper"
|
||
"go-admin/models/binancedto"
|
||
"go-admin/models/commondto"
|
||
"go-admin/pkg/jsonhelper"
|
||
"go-admin/pkg/utility"
|
||
"math/rand"
|
||
"net"
|
||
"net/http"
|
||
"net/url"
|
||
"os"
|
||
"os/signal"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/bytedance/sonic"
|
||
log "github.com/go-admin-team/go-admin-core/logger"
|
||
"github.com/gorilla/websocket"
|
||
"golang.org/x/net/proxy"
|
||
)
|
||
|
||
type BinanceWebSocketManager struct {
|
||
ws *websocket.Conn
|
||
stopChannel chan struct{}
|
||
url string
|
||
/* 0-现货 1-合约 */
|
||
wsType int
|
||
apiKey string
|
||
apiSecret string
|
||
proxyType string
|
||
proxyAddress string
|
||
reconnect chan struct{}
|
||
isStopped bool // 标记 WebSocket 是否已主动停止
|
||
mu sync.Mutex // 用于控制并发访问 isStopped
|
||
cancelFunc context.CancelFunc
|
||
listenKey string // 新增字段
|
||
}
|
||
|
||
// 已有连接
|
||
var SpotSockets = map[string]*BinanceWebSocketManager{}
|
||
var FutureSockets = map[string]*BinanceWebSocketManager{}
|
||
|
||
func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
|
||
url := ""
|
||
|
||
switch wsType {
|
||
case 0:
|
||
url = "wss://stream.binance.com:9443/ws"
|
||
case 1:
|
||
url = "wss://fstream.binance.com/ws"
|
||
}
|
||
|
||
return &BinanceWebSocketManager{
|
||
stopChannel: make(chan struct{}, 10),
|
||
reconnect: make(chan struct{}, 10),
|
||
isStopped: false,
|
||
url: url,
|
||
wsType: wsType,
|
||
apiKey: apiKey,
|
||
apiSecret: apiSecret,
|
||
proxyType: proxyType,
|
||
proxyAddress: proxyAddress,
|
||
}
|
||
}
|
||
|
||
func (wm *BinanceWebSocketManager) Start() {
|
||
utility.SafeGo(wm.run)
|
||
// wm.run()
|
||
}
|
||
|
||
// 重启连接
|
||
func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
|
||
wm.mu.Lock()
|
||
defer wm.mu.Unlock()
|
||
|
||
wm.apiKey = apiKey
|
||
wm.apiSecret = apiSecret
|
||
wm.proxyType = proxyType
|
||
wm.proxyAddress = proxyAddress
|
||
|
||
if wm.isStopped {
|
||
wm.isStopped = false
|
||
utility.SafeGo(wm.run)
|
||
} else {
|
||
wm.reconnect <- struct{}{}
|
||
}
|
||
|
||
return wm
|
||
}
|
||
|
||
func Restart(wm *BinanceWebSocketManager) {
|
||
wm.reconnect <- struct{}{}
|
||
}
|
||
func (wm *BinanceWebSocketManager) run() {
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
wm.cancelFunc = cancel
|
||
|
||
utility.SafeGo(wm.handleSignal)
|
||
|
||
// 计算错误记录键
|
||
errKey := fmt.Sprintf(global.API_WEBSOCKET_ERR, wm.apiKey)
|
||
errMessage := commondto.WebSocketErr{Time: time.Now()}
|
||
helper.DefaultRedis.SetString(errKey, jsonhelper.ToJsonString(errMessage))
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
if err := wm.connect(ctx); err != nil {
|
||
wm.handleConnectionError(errKey, err)
|
||
|
||
if wm.isErrorCountExceeded(errKey) {
|
||
log.Error("连接 %s WebSocket 时出错次数过多,停止 WebSocket 管理器: %v", wm.wsType, wm.apiKey)
|
||
wm.Stop()
|
||
return
|
||
}
|
||
|
||
time.Sleep(5 * time.Second)
|
||
continue
|
||
}
|
||
|
||
<-wm.stopChannel
|
||
log.Info("停止 %s WebSocket 管理器...", getWsTypeName(wm.wsType))
|
||
wm.Stop()
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// handleConnectionError 处理 WebSocket 连接错误
|
||
func (wm *BinanceWebSocketManager) handleConnectionError(errKey string, err error) {
|
||
// 从 Redis 获取错误记录
|
||
var errMessage commondto.WebSocketErr
|
||
val, _ := helper.DefaultRedis.GetString(errKey)
|
||
if val != "" {
|
||
sonic.UnmarshalString(val, &errMessage)
|
||
}
|
||
|
||
// 更新错误记录
|
||
errMessage.Count++
|
||
errMessage.Time = time.Now()
|
||
errMessage.ErrorMessage = err.Error()
|
||
|
||
// 将错误记录保存到 Redis
|
||
if data, err := sonic.MarshalString(errMessage); err == nil {
|
||
helper.DefaultRedis.SetString(errKey, data)
|
||
}
|
||
|
||
// 记录错误日志
|
||
log.Error("连接 %s WebSocket 时出错: %v, 错误: %v", wm.wsType, wm.apiKey, err)
|
||
}
|
||
|
||
// isErrorCountExceeded 检查错误次数是否超过阈值
|
||
func (wm *BinanceWebSocketManager) isErrorCountExceeded(errKey string) bool {
|
||
val, _ := helper.DefaultRedis.GetString(errKey)
|
||
if val == "" {
|
||
return false
|
||
}
|
||
|
||
var errMessage commondto.WebSocketErr
|
||
if err := sonic.UnmarshalString(val, &errMessage); err != nil {
|
||
return false
|
||
}
|
||
|
||
return errMessage.Count >= 5
|
||
}
|
||
|
||
// 处理终止信号
|
||
func (wm *BinanceWebSocketManager) handleSignal() {
|
||
ch := make(chan os.Signal)
|
||
signal.Notify(ch, os.Interrupt)
|
||
<-ch
|
||
wm.Stop()
|
||
}
|
||
|
||
func (wm *BinanceWebSocketManager) connect(ctx context.Context) error {
|
||
dialer, err := wm.getDialer()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
listenKey, err := wm.getListenKey()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
wm.listenKey = listenKey
|
||
url := fmt.Sprintf("%s/%s", wm.url, listenKey)
|
||
wm.ws, _, err = dialer.Dial(url, nil)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey))
|
||
|
||
// Ping处理
|
||
wm.ws.SetPingHandler(func(appData string) error {
|
||
log.Info(fmt.Sprintf("收到 wstype: %v key:%s Ping 消息【%s】", wm.wsType, wm.apiKey, appData))
|
||
|
||
for x := 0; x < 5; x++ {
|
||
if err := wm.ws.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second*10)); err != nil {
|
||
log.Error("binance 回应pong失败 次数:", strconv.Itoa(x), " err:", err)
|
||
|
||
time.Sleep(time.Second * 1)
|
||
continue
|
||
}
|
||
|
||
break
|
||
}
|
||
|
||
setLastTime(wm)
|
||
return nil
|
||
})
|
||
|
||
// utility.SafeGoParam(wm.restartConnect, ctx)
|
||
utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) })
|
||
utility.SafeGo(func() { wm.readMessages(ctx) })
|
||
utility.SafeGo(func() { wm.handleReconnect(ctx) })
|
||
|
||
return nil
|
||
}
|
||
|
||
// 更新最后通信时间
|
||
func setLastTime(wm *BinanceWebSocketManager) {
|
||
subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey)
|
||
val, _ := helper.DefaultRedis.GetString(subKey)
|
||
now := time.Now()
|
||
var data binancedto.UserSubscribeState
|
||
if val != "" {
|
||
sonic.Unmarshal([]byte(val), &data)
|
||
}
|
||
|
||
if wm.wsType == 0 {
|
||
data.SpotLastTime = &now
|
||
} else {
|
||
data.FuturesLastTime = &now
|
||
}
|
||
|
||
val, _ = sonic.MarshalString(&data)
|
||
|
||
if val != "" {
|
||
helper.DefaultRedis.SetString(subKey, val)
|
||
}
|
||
}
|
||
|
||
func (wm *BinanceWebSocketManager) getDialer() (*websocket.Dialer, error) {
|
||
if wm.proxyAddress == "" {
|
||
return &websocket.Dialer{}, nil
|
||
}
|
||
|
||
if !strings.HasPrefix(wm.proxyAddress, "http://") && !strings.HasPrefix(wm.proxyAddress, "https://") && !strings.HasPrefix(wm.proxyAddress, "socks5://") {
|
||
wm.proxyAddress = wm.proxyType + "://" + wm.proxyAddress
|
||
}
|
||
|
||
proxyURL, err := url.Parse(wm.proxyAddress)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to parse proxy URL: %v", err)
|
||
}
|
||
|
||
transport := &http.Transport{
|
||
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
|
||
}
|
||
|
||
switch proxyURL.Scheme {
|
||
case "socks5":
|
||
return wm.createSocks5Dialer(proxyURL)
|
||
case "http", "https":
|
||
transport.Proxy = http.ProxyURL(proxyURL)
|
||
return &websocket.Dialer{Proxy: transport.Proxy, TLSClientConfig: transport.TLSClientConfig}, nil
|
||
default:
|
||
return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme)
|
||
}
|
||
}
|
||
|
||
func (wm *BinanceWebSocketManager) createSocks5Dialer(proxyURL *url.URL) (*websocket.Dialer, error) {
|
||
auth := &proxy.Auth{}
|
||
if proxyURL.User != nil {
|
||
auth.User = proxyURL.User.Username()
|
||
auth.Password, _ = proxyURL.User.Password()
|
||
}
|
||
|
||
socksDialer, err := proxy.SOCKS5("tcp", proxyURL.Host, auth, proxy.Direct)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to create SOCKS5 proxy dialer: %v", err)
|
||
}
|
||
|
||
return &websocket.Dialer{
|
||
NetDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
|
||
return socksDialer.Dial(network, address)
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 复用创建HTTP客户端的逻辑
|
||
func (wm *BinanceWebSocketManager) createBinanceClient() (*helper.BinanceClient, error) {
|
||
return helper.NewBinanceClient(wm.apiKey, wm.apiSecret, wm.proxyType, wm.proxyAddress)
|
||
}
|
||
|
||
// 获取listenKey
|
||
func (wm *BinanceWebSocketManager) getListenKey() (string, error) {
|
||
client, err := wm.createBinanceClient()
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
var resp []byte
|
||
switch wm.wsType {
|
||
case 0:
|
||
resp, _, err = client.SendSpotRequestByKey("/api/v3/userDataStream", "POST", nil)
|
||
case 1:
|
||
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "POST", nil)
|
||
|
||
default:
|
||
log.Error("链接类型错误")
|
||
return "", errors.New("链接类型错误")
|
||
}
|
||
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
var dataMap map[string]interface{}
|
||
if err := sonic.Unmarshal(resp, &dataMap); err != nil {
|
||
return "", err
|
||
}
|
||
|
||
if listenKey, ok := dataMap["listenKey"]; ok {
|
||
return listenKey.(string), nil
|
||
}
|
||
|
||
return "", errors.New("listenKey 不存在")
|
||
}
|
||
|
||
// 接收消息
|
||
func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) {
|
||
defer wm.ws.Close()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
if wm.isStopped {
|
||
return
|
||
}
|
||
|
||
_, msg, err := wm.ws.ReadMessage()
|
||
if err != nil && strings.Contains(err.Error(), "websocket: close") {
|
||
if !wm.isStopped {
|
||
wm.reconnect <- struct{}{}
|
||
}
|
||
|
||
log.Error("websocket 关闭")
|
||
return
|
||
} else if err != nil {
|
||
log.Error("读取消息时出错: %v", err)
|
||
return
|
||
}
|
||
wm.handleOrderUpdate(msg)
|
||
|
||
}
|
||
}
|
||
}
|
||
|
||
func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) {
|
||
setLastTime(wm)
|
||
|
||
if reconnect, _ := ReceiveListen(msg, wm.wsType); reconnect {
|
||
wm.reconnect <- struct{}{}
|
||
}
|
||
}
|
||
|
||
func (wm *BinanceWebSocketManager) Stop() {
|
||
wm.mu.Lock()
|
||
defer wm.mu.Unlock()
|
||
|
||
if wm.isStopped {
|
||
return
|
||
}
|
||
|
||
wm.isStopped = true
|
||
// 关闭 stopChannel(确保已经关闭,避免 panic)
|
||
select {
|
||
case <-wm.stopChannel:
|
||
default:
|
||
close(wm.stopChannel)
|
||
}
|
||
|
||
if wm.cancelFunc != nil {
|
||
wm.cancelFunc()
|
||
}
|
||
|
||
if wm.ws != nil {
|
||
if err := wm.ws.Close(); err != nil {
|
||
log.Error(fmt.Sprintf("key【%s】close失败", wm.apiKey), err)
|
||
} else {
|
||
log.Info(fmt.Sprintf("key【%s】close", wm.apiKey))
|
||
}
|
||
}
|
||
|
||
// **重新创建 stopChannel,避免 Restart() 时无效**
|
||
wm.stopChannel = make(chan struct{})
|
||
}
|
||
|
||
// 重连机制
|
||
func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) {
|
||
maxRetries := 5 // 最大重试次数
|
||
retryCount := 0
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-wm.reconnect:
|
||
if wm.isStopped {
|
||
return
|
||
}
|
||
|
||
log.Warn("WebSocket 连接断开,尝试重连...")
|
||
|
||
if wm.ws != nil {
|
||
wm.ws.Close()
|
||
}
|
||
|
||
// 取消旧的上下文
|
||
if wm.cancelFunc != nil {
|
||
wm.cancelFunc()
|
||
}
|
||
|
||
for {
|
||
newCtx, cancel := context.WithCancel(context.Background())
|
||
wm.cancelFunc = cancel // 更新 cancelFunc
|
||
|
||
if err := wm.connect(newCtx); err != nil {
|
||
log.Errorf("重连失败: %v", err)
|
||
cancel()
|
||
retryCount++
|
||
|
||
if retryCount >= maxRetries {
|
||
log.Error("重连失败次数过多,退出重连逻辑")
|
||
return
|
||
}
|
||
|
||
time.Sleep(5 * time.Second)
|
||
continue
|
||
}
|
||
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 定期删除listenkey 并重启ws
|
||
func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) {
|
||
time.Sleep(30 * time.Minute)
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
if err := wm.deleteListenKey(listenKey); err != nil {
|
||
log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
|
||
} else {
|
||
log.Debug("Successfully delete listenKey")
|
||
wm.reconnect <- struct{}{}
|
||
}
|
||
}
|
||
|
||
// ticker := time.NewTicker(5 * time.Minute)
|
||
// defer ticker.Stop()
|
||
|
||
// for {
|
||
// select {
|
||
// case <-ticker.C:
|
||
// if wm.isStopped {
|
||
// return
|
||
// }
|
||
|
||
// if err := wm.deleteListenKey(listenKey); err != nil {
|
||
// log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
|
||
// } else {
|
||
// log.Debug("Successfully delete listenKey")
|
||
// wm.reconnect <- struct{}{}
|
||
// return
|
||
// }
|
||
// case <-ctx.Done():
|
||
// return
|
||
// }
|
||
// }
|
||
}
|
||
|
||
// 定时续期
|
||
func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) {
|
||
ticker := time.NewTicker(30 * time.Minute)
|
||
defer func() {
|
||
log.Debug("定时续期任务退出 key:", wm.apiKey)
|
||
ticker.Stop()
|
||
}()
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
if wm.isStopped {
|
||
return
|
||
}
|
||
|
||
if err := wm.renewListenKey(wm.listenKey); err != nil {
|
||
log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
|
||
}
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
/*
|
||
删除listenkey
|
||
*/
|
||
func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error {
|
||
client, err := wm.createBinanceClient()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
var resp []byte
|
||
|
||
switch wm.wsType {
|
||
case 0:
|
||
path := fmt.Sprintf("/api/v3/userDataStream")
|
||
params := map[string]interface{}{
|
||
"listenKey": listenKey,
|
||
}
|
||
resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params)
|
||
|
||
log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
|
||
case 1:
|
||
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil)
|
||
log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
|
||
default:
|
||
return errors.New("unknown ws type")
|
||
}
|
||
|
||
return err
|
||
}
|
||
|
||
func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error {
|
||
// payloadParam := map[string]interface{}{
|
||
// "listenKey": listenKey,
|
||
// "apiKey": wm.apiKey,
|
||
// }
|
||
|
||
// params := map[string]interface{}{
|
||
// "id": getUUID(),
|
||
// "method": "userDataStream.ping",
|
||
// "params": payloadParam,
|
||
// }
|
||
|
||
// if err := wm.ws.WriteJSON(params); err != nil {
|
||
// return err
|
||
// }
|
||
// wm.ws.WriteJSON()
|
||
client, err := wm.createBinanceClient()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
var resp []byte
|
||
|
||
switch wm.wsType {
|
||
case 0:
|
||
path := fmt.Sprintf("/api/v3/userDataStream?listenKey=%s", listenKey)
|
||
resp, _, err = client.SendSpotRequestByKey(path, "PUT", nil)
|
||
log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
|
||
case 1:
|
||
// path := fmt.Sprintf("/fapi/v1/listenKey", listenKey)
|
||
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "PUT", nil)
|
||
log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
|
||
default:
|
||
return errors.New("unknown ws type")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func getWsTypeName(wsType int) string {
|
||
switch wsType {
|
||
case 0:
|
||
return "spot"
|
||
case 1:
|
||
return "futures"
|
||
default:
|
||
return "unknown"
|
||
}
|
||
}
|
||
func getUUID() string {
|
||
return fmt.Sprintf("%s-%s-%s-%s-%s", randomHex(8), randomHex(4), randomHex(4), randomHex(4), randomHex(12))
|
||
}
|
||
|
||
func randomHex(n int) string {
|
||
rand.New(rand.NewSource(time.Now().UnixNano()))
|
||
hexChars := "0123456789abcdef"
|
||
bytes := make([]byte, n)
|
||
for i := 0; i < n; i++ {
|
||
bytes[i] = hexChars[rand.Intn(len(hexChars))]
|
||
}
|
||
return string(bytes)
|
||
}
|