This commit is contained in:
2025-02-06 11:14:33 +08:00
commit 07847a2d9e
535 changed files with 65131 additions and 0 deletions

View File

@ -0,0 +1,94 @@
package excservice
import (
"go-admin/pkg/httputils"
"github.com/bytedance/sonic"
"go.uber.org/zap"
log "github.com/go-admin-team/go-admin-core/logger"
)
var (
// DefaultHttpClientConfig = &HttpClientConfig{
// Proxy: nil,
// HttpTimeout: 5 * time.Second,
// MaxIdleConns: 10}
)
var (
timeOffset int64 = 0
)
//var INERNAL_KLINE_PERIOD_CONVERTER = map[int]string{
// models.KLINE_1MIN: "1m",
// models.KLINE_3MIN: "3m",
// models.KLINE_5MIN: "5m",
// models.KLINE_15MIN: "15m",
// models.KLINE_30MIN: "30m",
// models.KLINE_60MIN: "1h",
// //models.KLINE_1H: "1h",
// models.KLINE_2H: "2h",
// models.KLINE_4H: "4h",
// models.KLINE_6H: "6h",
// models.KLINE_8H: "8h",
// models.KLINE_12H: "12h",
// models.KLINE_1DAY: "1d",
// models.KLINE_3DAY: "3d",
// models.KLINE_1WEEK: "1w",
// models.KLINE_1MONTH: "1M",
//}
type Filter struct {
FilterType string `json:"filterType"`
MaxPrice string `json:"maxPrice"`
MinPrice string `json:"minPrice"`
TickSize string `json:"tickSize"`
MultiplierUp string `json:"multiplierUp,string"`
MultiplierDown string `json:"multiplierDown,string"`
MinQty string `json:"minQty"`
MaxQty string `json:"maxQty"`
StepSize string `json:"stepSize"`
MinNotional string `json:"minNotional"`
}
//
//type RateLimit struct {
// Interval string `json:"interval"`
// IntervalNum int64 `json:"intervalNum"`
// Limit int64 `json:"limit"`
// RateLimitType string `json:"rateLimitType"`
//}
type TradeSymbol struct {
Symbol string `json:"symbol"`
Status string `json:"status"`
BaseAsset string `json:"baseAsset"` //基础币种
QuoteAsset string `json:"quoteAsset"` //计价币种
BaseAssetPrecision int `json:"baseAssetPrecision"` //基础币种小数点位数
QuotePrecision int `json:"quotePrecision"` //价格小数点位数 添加新字段 quoteAssetPrecision。此字段和 quotePrecision 重复。在未来的版本(v4)中 quotePrecision 会被移除
QuoteAssetPrecision int `json:"quoteAssetPrecision"` //
BaseCommissionPrecision int `json:"baseCommissionPrecision"`
QuoteCommissionPrecision int `json:"quoteCommissionPrecision"`
OrderTypes []string `json:"orderTypes"`
Filters []Filter `json:"filters"`
}
type ExchangeInfo struct {
Timezone string `json:"timezone"`
ServerTime int `json:"serverTime"`
//ExchangeFilters []interface{} `json:"exchangeFilters,omitempty"`
//RateLimits []RateLimit `json:"rateLimits"`
Symbols []TradeSymbol `json:"symbols"`
}
// 获取exchangeInfo
func GetExchangeInfoPro() ([]TradeSymbol, error) {
respData, err := httputils.NewHttpRequestWithFasthttp("GET", apiUrl+"/api/v3/exchangeInfo", "", nil)
if err != nil {
log.Error("获取exchangeInfo", zap.Error(err))
return nil, err
}
var info ExchangeInfo
sonic.Unmarshal(respData, &info)
return info.Symbols, nil
}

View File

@ -0,0 +1,299 @@
package excservice
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"net/url"
"strconv"
"strings"
"time"
"go.uber.org/zap"
"go-admin/common/global"
"go-admin/common/helper"
"go-admin/models"
"go-admin/pkg/httputils"
"go-admin/pkg/utility"
"github.com/bytedance/sonic"
log "github.com/go-admin-team/go-admin-core/logger"
)
const (
TICKER_URI = "/api/v3/ticker/24hr?symbol=%s"
TICKERS_URI = "ticker/allBookTickers"
DEPTH_URI = "/api/v3/depth?symbol=%s&limit=%d"
ACCOUNT_URI = "/api/v3/account?"
ORDER_URI = "/api/v3/order"
UNFINISHED_ORDERS_INFO = "openOrders?"
KLINE_URI = "klines"
SERVER_TIME_URL = "/api/v3/time"
)
var (
apiUrl = "https://api.binance.com"
//如果上面的baseURL访问有性能问题请访问下面的API集群:
//https://api1.binance.com
//https://api2.binance.com
//https://api3.binance.com
)
func init() {
//err := setTimeOffsetPro()
//if err != nil {
// fmt.Println("setTimeOffsetPro,err:", err)
//}
}
func buildParamsSigned(postForm *url.Values, secretKey string) {
postForm.Set("recvWindow", "60000")
tonce := strconv.FormatInt(time.Now().UnixNano()+timeOffset, 10)[0:13]
postForm.Set("timestamp", tonce)
payload := postForm.Encode()
sign, _ := GetHmacSHA256Sign(secretKey, payload)
postForm.Set("signature", sign)
}
func GetHmacSHA256Sign(secret, params string) (string, error) {
mac := hmac.New(sha256.New, []byte(secret))
_, err := mac.Write([]byte(params))
if err != nil {
return "", err
}
return hex.EncodeToString(mac.Sum(nil)), nil
}
// 获取服务器时间
func setTimeOffsetPro() error {
respData, err := httputils.NewHttpRequestWithFasthttp("GET", apiUrl+SERVER_TIME_URL, "", nil)
if err != nil {
return err
}
var bodyDataMap map[string]interface{}
err = json.Unmarshal(respData, &bodyDataMap)
if err != nil {
log.Error(string(respData))
return err
}
stime := int64(utility.ToInt(bodyDataMap["serverTime"]))
st := time.Unix(stime/1000, 1000000*(stime%1000))
lt := time.Now()
offset := st.Sub(lt).Nanoseconds()
timeOffset = offset
return nil
}
// GetTicker 获取24小时行情
func GetTicker(coin, currency string) (models.Ticker24, error) {
par := strings.ToUpper(coin + currency)
tickerUri := apiUrl + fmt.Sprintf(TICKER_URI, par)
var ticker models.Ticker24
respData, err := httputils.NewHttpRequestWithFasthttp("GET", tickerUri, "", nil)
if err != nil {
log.Error("GetTicker", zap.Error(err))
return ticker, err
}
var tickerMap map[string]interface{}
err = json.Unmarshal(respData, &tickerMap)
if err != nil {
log.Error("GetTicker", zap.ByteString("respData", respData), zap.Error(err))
return ticker, err
}
ticker.LastPrice = tickerMap["lastPrice"].(string)
ticker.LowPrice = tickerMap["lowPrice"].(string)
ticker.HighPrice = tickerMap["highPrice"].(string)
ticker.Volume = tickerMap["volume"].(string)
ticker.QuoteVolume = tickerMap["quoteVolume"].(string)
ticker.ChangePercent = tickerMap["priceChangePercent"].(string)
ticker.OpenPrice = tickerMap["openPrice"].(string)
return ticker, nil
}
// GetTickerBySymbols 获取24小时行情 symbols symbols参数可接受的格式 ["BTCUSDT","BNBUSDT"]
func GetTickerBySymbols(symbols string) ([]models.Ticker24, error) {
tickerUri := apiUrl + "/api/v3/ticker/24hr"
respData, err := httputils.NewHttpRequestWithFasthttp("GET", tickerUri, "", nil)
if err != nil {
log.Error("GetTicker", zap.Error(err))
return nil, err
}
var tickerList []interface{}
err = json.Unmarshal(respData, &tickerList)
if err != nil {
log.Error("GetTickerBySymbols", zap.ByteString("respData", respData), zap.Error(err))
return nil, err
}
list := make([]models.Ticker24, 0, len(tickerList))
for _, t := range tickerList {
tickerMap := t.(map[string]interface{})
if tickerMap == nil {
continue
}
var ticker models.Ticker24
ticker.LastPrice = tickerMap["lastPrice"].(string)
ticker.LowPrice = tickerMap["lowPrice"].(string)
ticker.HighPrice = tickerMap["highPrice"].(string)
ticker.Volume = tickerMap["volume"].(string)
ticker.QuoteVolume = tickerMap["quoteVolume"].(string)
ticker.ChangePercent = tickerMap["priceChangePercent"].(string)
ticker.OpenPrice = tickerMap["openPrice"].(string)
ticker.Symbol = tickerMap["symbol"].(string)
list = append(list, ticker)
}
return list, nil
}
// GetKlinePro 获取k线--现货行情接口
func GetKlinePro(coin, currency string, period string, size int) ([]models.Kline, error) {
par := strings.ToUpper(coin + currency)
periodS := period //, isOk := INERNAL_KLINE_PERIOD_CONVERTER[period]
//if isOk != true {
// periodS = "M1"
//}
key := fmt.Sprintf("%s:%s:%s", global.K_SPOT, par, period)
//获取缓存
klineStrs, err := helper.DefaultRedis.GetAllSortSet(key)
if err != nil {
return nil, err
}
if len(klineStrs) > 0 && len(klineStrs) >= 500 {
klines := make([]models.Kline, 0)
for _, item := range klineStrs {
var kline models.Kline
err := sonic.Unmarshal([]byte(item), &kline)
if err == nil {
klines = append(klines, kline)
}
}
return klines, nil
}
//没有缓存 重新获取
urlKline := apiUrl + "/api/v3/klines?symbol=" + par + "&interval=" + periodS + "&limit=" + utility.IntTostring(size)
respData, err := httputils.NewHttpRequestWithFasthttp("GET", urlKline, "", nil)
if err != nil {
return nil, err
}
var bodyDataMap []interface{}
err = json.Unmarshal(respData, &bodyDataMap)
if err != nil {
log.Error("GetKlinePro", zap.ByteString("respData", respData), zap.Error(err))
return nil, err
}
var klines []models.Kline
for _, _record := range bodyDataMap {
r := models.Kline{}
record := _record.([]interface{})
times := utility.ToFloat64(record[0]) //to unix timestramp
// 超出10位的 处理为
if times > 9999999999 {
r.Timestamp = int64(times) / 1000
} else {
r.Timestamp = int64(times)
}
r.Open = record[1].(string)
r.High = record[2].(string)
r.Low = record[3].(string)
r.Close = record[4].(string)
r.Vol = record[5].(string)
r.QuoteVolume = record[7].(string)
klines = append(klines, r)
member, err := sonic.Marshal(r)
if err == nil {
err = helper.DefaultRedis.SignelAdd(key, float64(r.Timestamp), string(member))
if err != nil {
log.Error("保存k线数据失败:", key, err)
}
}
}
return klines, nil
}
// GetTrades 非个人,整个交易所的交易记录
// 注意since is fromId
func GetTrades(coin, currency string) ([]models.NewDealPush, error) {
param := url.Values{}
param.Set("symbol", strings.ToUpper(coin+currency))
param.Set("limit", "50")
//if since > 0 {
// param.Set("fromId", strconv.Itoa(int(since)))
//}
urlTrade := apiUrl + "/api/v3/trades?" + param.Encode()
resp, err := httputils.NewHttpRequestWithFasthttp("GET", urlTrade, "", nil)
if err != nil {
return nil, err
}
var bodyDataMap []interface{}
err = json.Unmarshal(resp, &bodyDataMap)
if err != nil {
log.Error("GetTrades", zap.ByteString("respData", resp), zap.Error(err))
return nil, err
}
var trades []models.NewDealPush
for _, v := range bodyDataMap {
m := v.(map[string]interface{})
ty := 2
if m["isBuyerMaker"].(bool) {
ty = 1
}
trades = append(trades, models.NewDealPush{
DealId: utility.ToInt64(m["id"]),
Type: ty,
Num: utility.ToFloat64(m["qty"]),
Price: utility.ToFloat64(m["price"]),
CreateTime: utility.ToInt64(m["time"]),
})
}
return trades, nil
}
// GetDepth 获取深度
func GetDepth(size int, coin, currency string) (models.DepthBin, error) {
if size <= 5 {
size = 5
} else if size <= 10 {
size = 10
} else if size <= 20 {
size = 20
} else if size <= 50 {
size = 50
} else if size <= 100 {
size = 100
} else if size <= 500 {
size = 500
} else {
size = 1000
}
urlDep := fmt.Sprintf(apiUrl+DEPTH_URI, strings.ToUpper(coin+currency), size)
respFive, err := httputils.NewHttpRequestWithFasthttp("GET", urlDep, "", nil)
if err != nil {
return models.DepthBin{}, err
}
d := models.DepthBin{}
err = sonic.Unmarshal(respFive, &d)
if err != nil {
fmt.Println("GetDepth json unmarshal error for ", string(respFive), zap.Error(err))
return models.DepthBin{}, err
}
return d, nil
}

View File

@ -0,0 +1,111 @@
package excservice
import (
"go-admin/models/futuresdto"
"go-admin/pkg/utility"
"go-admin/services/binanceservice"
"strconv"
"github.com/bytedance/sonic"
log "github.com/go-admin-team/go-admin-core/logger"
)
/*
用户订单订阅处理
- @msg 消息内容
- @listenType 订阅类型 0-现货 1-合约
*/
func ReceiveListen(msg []byte, listenType int) (reconnect bool, err error) {
var dataMap map[string]interface{}
err = sonic.Unmarshal(msg, &dataMap)
if err != nil {
log.Error("接收ws 反序列化失败:", err)
return
}
event, exits := dataMap["e"]
if !exits {
log.Error("不存在event")
return
}
switch event {
//listenKey过期
case "listenKeyExpired":
log.Info("listenKey过期", string(msg))
return true, nil
//订单变更
case "ORDER_TRADE_UPDATE":
log.Info("ORDER_TRADE_UPDATE 推送:", string(msg))
//现货
if listenType == 0 {
var mapData map[string]interface{}
err = sonic.Unmarshal(msg, &mapData)
if err != nil {
log.Error("订单变更处理失败", err)
break
}
utility.SafeGo(func() {
binanceservice.ChangeSpotOrder(mapData)
})
} else {
var data futuresdto.OrderTradeUpdate
err = sonic.Unmarshal(msg, &data)
if err != nil {
log.Error("订单变更处理失败", err)
break
}
utility.SafeGo(func() {
binanceservice.ChangeFutureOrder(data.OrderDetails)
})
}
//订单更新
case "executionReport":
log.Info("executionReport 推送:", string(msg))
if listenType == 0 { //现货
binanceservice.ChangeSpotOrder(dataMap)
} else if listenType == 1 { //合约
binanceservice.ChangeFutureOrder(dataMap)
} else {
log.Error("executionReport 不支持的订阅类型", strconv.Itoa(listenType))
}
//杠杆倍数等账户配置 更新推送
case "ACCOUNT_CONFIG_UPDATE":
log.Info(string(msg))
//追加保证金
case "MARGIN_CALL":
log.Info(string(msg))
//条件订单(TP/SL)触发后拒绝更新推送
case "CONDITIONAL_ORDER_TRIGGER_REJECT":
or, exits := dataMap["or"].(string)
if exits {
var data futuresdto.OrderTriggerReject
sonic.UnmarshalString(or, &data)
if data.OrderNo > 0 {
log.Info("订单号【%v】止盈止损触发后被拒绝%s", data.OrderNo, data.Reason)
}
}
case "eventStreamTerminated":
log.Info("账户数据流被终止 type:", getWsTypeName(listenType))
default:
log.Info("未知事件 内容:", string(msg))
log.Info("未知事件", event)
return false, nil
}
return false, nil
}

View File

@ -0,0 +1,600 @@
package excservice
import (
"context"
"crypto/tls"
"errors"
"fmt"
"go-admin/common/global"
"go-admin/common/helper"
"go-admin/models/binancedto"
"go-admin/models/commondto"
"go-admin/pkg/jsonhelper"
"go-admin/pkg/utility"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"time"
"github.com/bytedance/sonic"
log "github.com/go-admin-team/go-admin-core/logger"
"github.com/gorilla/websocket"
"golang.org/x/net/proxy"
)
type BinanceWebSocketManager struct {
ws *websocket.Conn
stopChannel chan struct{}
url string
/* 0-现货 1-合约 */
wsType int
apiKey string
apiSecret string
proxyType string
proxyAddress string
reconnect chan struct{}
isStopped bool // 标记 WebSocket 是否已主动停止
mu sync.Mutex // 用于控制并发访问 isStopped
cancelFunc context.CancelFunc
listenKey string // 新增字段
}
// 已有连接
var SpotSockets = map[string]*BinanceWebSocketManager{}
var FutureSockets = map[string]*BinanceWebSocketManager{}
func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
url := ""
switch wsType {
case 0:
url = "wss://stream.binance.com:9443/ws"
case 1:
url = "wss://fstream.binance.com/ws"
}
return &BinanceWebSocketManager{
stopChannel: make(chan struct{}, 10),
reconnect: make(chan struct{}, 10),
isStopped: false,
url: url,
wsType: wsType,
apiKey: apiKey,
apiSecret: apiSecret,
proxyType: proxyType,
proxyAddress: proxyAddress,
}
}
func (wm *BinanceWebSocketManager) Start() {
utility.SafeGo(wm.run)
// wm.run()
}
// 重启连接
func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAddress string) *BinanceWebSocketManager {
wm.apiKey = apiKey
wm.apiSecret = apiSecret
wm.proxyType = proxyType
wm.proxyAddress = proxyAddress
wm.reconnect <- struct{}{}
return wm
}
func Restart(wm *BinanceWebSocketManager) {
wm.reconnect <- struct{}{}
}
func (wm *BinanceWebSocketManager) run() {
ctx, cancel := context.WithCancel(context.Background())
wm.cancelFunc = cancel
utility.SafeGo(wm.handleSignal)
// 计算错误记录键
errKey := fmt.Sprintf(global.API_WEBSOCKET_ERR, wm.apiKey)
errMessage := commondto.WebSocketErr{Time: time.Now()}
helper.DefaultRedis.SetString(errKey, jsonhelper.ToJsonString(errMessage))
for {
select {
case <-ctx.Done():
return
default:
if err := wm.connect(ctx); err != nil {
wm.handleConnectionError(errKey, err)
if wm.isErrorCountExceeded(errKey) {
log.Error("连接 %s WebSocket 时出错次数过多,停止 WebSocket 管理器: %v", wm.wsType, wm.apiKey)
wm.Stop()
return
}
time.Sleep(5 * time.Second)
continue
}
<-wm.stopChannel
log.Info("停止 %s WebSocket 管理器...", getWsTypeName(wm.wsType))
wm.Stop()
return
}
}
}
// handleConnectionError 处理 WebSocket 连接错误
func (wm *BinanceWebSocketManager) handleConnectionError(errKey string, err error) {
// 从 Redis 获取错误记录
var errMessage commondto.WebSocketErr
val, _ := helper.DefaultRedis.GetString(errKey)
if val != "" {
sonic.UnmarshalString(val, &errMessage)
}
// 更新错误记录
errMessage.Count++
errMessage.Time = time.Now()
errMessage.ErrorMessage = err.Error()
// 将错误记录保存到 Redis
if data, err := sonic.MarshalString(errMessage); err == nil {
helper.DefaultRedis.SetString(errKey, data)
}
// 记录错误日志
log.Error("连接 %s WebSocket 时出错: %v, 错误: %v", wm.wsType, wm.apiKey, err)
}
// isErrorCountExceeded 检查错误次数是否超过阈值
func (wm *BinanceWebSocketManager) isErrorCountExceeded(errKey string) bool {
val, _ := helper.DefaultRedis.GetString(errKey)
if val == "" {
return false
}
var errMessage commondto.WebSocketErr
if err := sonic.UnmarshalString(val, &errMessage); err != nil {
return false
}
return errMessage.Count >= 5
}
// 处理终止信号
func (wm *BinanceWebSocketManager) handleSignal() {
ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt)
<-ch
wm.Stop()
}
func (wm *BinanceWebSocketManager) connect(ctx context.Context) error {
dialer, err := wm.getDialer()
if err != nil {
return err
}
listenKey, err := wm.getListenKey()
if err != nil {
return err
}
wm.listenKey = listenKey
url := fmt.Sprintf("%s/%s", wm.url, listenKey)
wm.ws, _, err = dialer.Dial(url, nil)
if err != nil {
return err
}
log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey))
// Ping处理
wm.ws.SetPingHandler(func(appData string) error {
log.Info(fmt.Sprintf("收到 wstype: %v key:%s Ping 消息【%s】", wm.wsType, wm.apiKey, appData))
for x := 0; x < 5; x++ {
if err := wm.ws.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second*10)); err != nil {
log.Error("binance 回应pong失败 次数:", strconv.Itoa(x), " err:", err)
time.Sleep(time.Second * 1)
continue
}
break
}
setLastTime(wm)
return nil
})
// utility.SafeGoParam(wm.restartConnect, ctx)
utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) })
utility.SafeGo(func() { wm.readMessages(ctx) })
utility.SafeGo(func() { wm.handleReconnect(ctx) })
return nil
}
// 更新最后通信时间
func setLastTime(wm *BinanceWebSocketManager) {
subKey := fmt.Sprintf(global.USER_SUBSCRIBE, wm.apiKey)
val, _ := helper.DefaultRedis.GetString(subKey)
now := time.Now()
var data binancedto.UserSubscribeState
if val != "" {
sonic.Unmarshal([]byte(val), &data)
}
if wm.wsType == 0 {
data.SpotLastTime = &now
} else {
data.FuturesLastTime = &now
}
val, _ = sonic.MarshalString(&data)
if val != "" {
helper.DefaultRedis.SetString(subKey, val)
}
}
func (wm *BinanceWebSocketManager) getDialer() (*websocket.Dialer, error) {
if wm.proxyAddress == "" {
return &websocket.Dialer{}, nil
}
if !strings.HasPrefix(wm.proxyAddress, "http://") && !strings.HasPrefix(wm.proxyAddress, "https://") && !strings.HasPrefix(wm.proxyAddress, "socks5://") {
wm.proxyAddress = wm.proxyType + "://" + wm.proxyAddress
}
proxyURL, err := url.Parse(wm.proxyAddress)
if err != nil {
return nil, fmt.Errorf("failed to parse proxy URL: %v", err)
}
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
}
switch proxyURL.Scheme {
case "socks5":
return wm.createSocks5Dialer(proxyURL)
case "http", "https":
transport.Proxy = http.ProxyURL(proxyURL)
return &websocket.Dialer{Proxy: transport.Proxy, TLSClientConfig: transport.TLSClientConfig}, nil
default:
return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme)
}
}
func (wm *BinanceWebSocketManager) createSocks5Dialer(proxyURL *url.URL) (*websocket.Dialer, error) {
auth := &proxy.Auth{}
if proxyURL.User != nil {
auth.User = proxyURL.User.Username()
auth.Password, _ = proxyURL.User.Password()
}
socksDialer, err := proxy.SOCKS5("tcp", proxyURL.Host, auth, proxy.Direct)
if err != nil {
return nil, fmt.Errorf("failed to create SOCKS5 proxy dialer: %v", err)
}
return &websocket.Dialer{
NetDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
return socksDialer.Dial(network, address)
},
}, nil
}
// 复用创建HTTP客户端的逻辑
func (wm *BinanceWebSocketManager) createBinanceClient() (*helper.BinanceClient, error) {
return helper.NewBinanceClient(wm.apiKey, wm.apiSecret, wm.proxyType, wm.proxyAddress)
}
// 获取listenKey
func (wm *BinanceWebSocketManager) getListenKey() (string, error) {
client, err := wm.createBinanceClient()
if err != nil {
return "", err
}
var resp []byte
switch wm.wsType {
case 0:
resp, _, err = client.SendSpotRequestByKey("/api/v3/userDataStream", "POST", nil)
case 1:
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "POST", nil)
default:
log.Error("链接类型错误")
return "", errors.New("链接类型错误")
}
if err != nil {
return "", err
}
var dataMap map[string]interface{}
if err := sonic.Unmarshal(resp, &dataMap); err != nil {
return "", err
}
if listenKey, ok := dataMap["listenKey"]; ok {
return listenKey.(string), nil
}
return "", errors.New("listenKey 不存在")
}
// 接收消息
func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) {
defer wm.ws.Close()
for {
select {
case <-ctx.Done():
return
default:
if wm.isStopped {
return
}
_, msg, err := wm.ws.ReadMessage()
if err != nil && strings.Contains(err.Error(), "websocket: close") {
if !wm.isStopped {
wm.reconnect <- struct{}{}
}
log.Error("websocket 关闭")
return
} else if err != nil {
log.Error("读取消息时出错: %v", err)
return
}
wm.handleOrderUpdate(msg)
}
}
}
func (wm *BinanceWebSocketManager) handleOrderUpdate(msg []byte) {
setLastTime(wm)
if reconnect, _ := ReceiveListen(msg, wm.wsType); reconnect {
wm.reconnect <- struct{}{}
}
}
func (wm *BinanceWebSocketManager) Stop() {
wm.mu.Lock()
defer wm.mu.Unlock()
if wm.isStopped {
return
}
wm.isStopped = true
close(wm.stopChannel)
if wm.cancelFunc != nil {
wm.cancelFunc()
}
if wm.ws != nil {
if err := wm.ws.Close(); err != nil {
log.Error(fmt.Sprintf("key【%s】close失败", wm.apiKey), err)
} else {
log.Info(fmt.Sprintf("key【%s】close", wm.apiKey))
}
}
}
// 重连机制
func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) {
maxRetries := 5 // 最大重试次数
retryCount := 0
for {
select {
case <-ctx.Done():
return
case <-wm.reconnect:
if wm.isStopped {
return
}
log.Warn("WebSocket 连接断开,尝试重连...")
if wm.ws != nil {
wm.ws.Close()
}
// 取消旧的上下文
if wm.cancelFunc != nil {
wm.cancelFunc()
}
for {
newCtx, cancel := context.WithCancel(context.Background())
wm.cancelFunc = cancel // 更新 cancelFunc
if err := wm.connect(newCtx); err != nil {
log.Errorf("重连失败: %v", err)
cancel()
retryCount++
if retryCount >= maxRetries {
log.Error("重连失败次数过多,退出重连逻辑")
return
}
time.Sleep(5 * time.Second)
continue
}
return
}
}
}
}
// 定期删除listenkey 并重启ws
func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) {
time.Sleep(30 * time.Minute)
select {
case <-ctx.Done():
return
default:
if err := wm.deleteListenKey(listenKey); err != nil {
log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
} else {
log.Debug("Successfully delete listenKey")
wm.reconnect <- struct{}{}
}
}
// ticker := time.NewTicker(5 * time.Minute)
// defer ticker.Stop()
// for {
// select {
// case <-ticker.C:
// if wm.isStopped {
// return
// }
// if err := wm.deleteListenKey(listenKey); err != nil {
// log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
// } else {
// log.Debug("Successfully delete listenKey")
// wm.reconnect <- struct{}{}
// return
// }
// case <-ctx.Done():
// return
// }
// }
}
// 定时续期
func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) {
ticker := time.NewTicker(30 * time.Minute)
defer func() {
log.Debug("定时续期任务退出 key:", wm.apiKey)
ticker.Stop()
}()
for {
select {
case <-ticker.C:
if wm.isStopped {
return
}
if err := wm.renewListenKey(wm.listenKey); err != nil {
log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err)
}
case <-ctx.Done():
return
}
}
}
/*
删除listenkey
*/
func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error {
client, err := wm.createBinanceClient()
if err != nil {
return err
}
var resp []byte
switch wm.wsType {
case 0:
path := fmt.Sprintf("/api/v3/userDataStream")
params := map[string]interface{}{
"listenKey": listenKey,
}
resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params)
log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
case 1:
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil)
log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp)))
default:
return errors.New("unknown ws type")
}
return err
}
func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error {
// payloadParam := map[string]interface{}{
// "listenKey": listenKey,
// "apiKey": wm.apiKey,
// }
// params := map[string]interface{}{
// "id": getUUID(),
// "method": "userDataStream.ping",
// "params": payloadParam,
// }
// if err := wm.ws.WriteJSON(params); err != nil {
// return err
// }
// wm.ws.WriteJSON()
client, err := wm.createBinanceClient()
if err != nil {
return err
}
var resp []byte
switch wm.wsType {
case 0:
path := fmt.Sprintf("/api/v3/userDataStream?listenKey=%s", listenKey)
resp, _, err = client.SendSpotRequestByKey(path, "PUT", nil)
log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
case 1:
// path := fmt.Sprintf("/fapi/v1/listenKey", listenKey)
resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "PUT", nil)
log.Debug(fmt.Sprintf("renewListenKey resp: %s", string(resp)))
default:
return errors.New("unknown ws type")
}
return nil
}
func getWsTypeName(wsType int) string {
switch wsType {
case 0:
return "spot"
case 1:
return "futures"
default:
return "unknown"
}
}
func getUUID() string {
return fmt.Sprintf("%s-%s-%s-%s-%s", randomHex(8), randomHex(4), randomHex(4), randomHex(4), randomHex(12))
}
func randomHex(n int) string {
rand.New(rand.NewSource(time.Now().UnixNano()))
hexChars := "0123456789abcdef"
bytes := make([]byte, n)
for i := 0; i < n; i++ {
bytes[i] = hexChars[rand.Intn(len(hexChars))]
}
return string(bytes)
}

View File

@ -0,0 +1,175 @@
package excservice
import (
"errors"
"fmt"
"strconv"
"time"
"go-admin/models"
"go-admin/pkg/timehelper"
"go-admin/pkg/utility"
"github.com/bytedance/sonic"
)
type BinanceWs struct {
baseURL string
combinedBaseURL string
proxyUrl string
WorkType string
wsConns []*WsConn
tickerCallback func(models.Ticker24, string, string)
forceCallback func(models.ForceOrder, string, string)
depthCallback func(models.DepthBin, string, string)
tradeCallback func(models.NewDealPush, string, string)
klineCallback func(models.Kline, int, string, string)
allBack func(msg []byte)
allBackKline func(msg []byte, tradeSet models.TradeSet)
}
func NewBinanceWs(wsbaseURL, proxyUrl string) *BinanceWs {
return &BinanceWs{
baseURL: wsbaseURL,
combinedBaseURL: "wss://stream.binance.com:9443/stream?streams=",
proxyUrl: proxyUrl,
}
}
func (bnWs *BinanceWs) SetProxyUrl(proxyUrl string) {
bnWs.proxyUrl = proxyUrl
}
func (bnWs *BinanceWs) SetBaseUrl(baseURL string) {
bnWs.baseURL = baseURL
}
func (bnWs *BinanceWs) SetCombinedBaseURL(combinedBaseURL string) {
bnWs.combinedBaseURL = combinedBaseURL
}
func (bnWs *BinanceWs) SetAllCallbacks(allBack func(msg []byte), allBackKline func(msg []byte, tradeSet models.TradeSet)) {
if bnWs.allBack == nil {
bnWs.allBack = allBack
}
if bnWs.allBackKline == nil {
bnWs.allBackKline = allBackKline
}
}
// 订阅通用函数
func (bnWs *BinanceWs) subscribe(endpoint string, handle func(msg []byte) error) {
wsConn := NewWsBuilder().
WsUrl(endpoint).
AutoReconnect().
ProtoHandleFunc(handle).
ProxyUrl(bnWs.proxyUrl).
ReconnectInterval(time.Millisecond * 5).
Build()
if wsConn == nil {
return
}
bnWs.wsConns = append(bnWs.wsConns, wsConn)
go bnWs.exitHandler(wsConn)
}
func (bnWs *BinanceWs) Close() {
for _, con := range bnWs.wsConns {
con.CloseWs()
}
}
func (bnWs *BinanceWs) Subscribe(streamName string, tradeSet models.TradeSet, callback func(msg []byte, tradeSet models.TradeSet)) error {
endpoint := bnWs.baseURL + streamName
handle := func(msg []byte) error {
callback(msg, tradeSet)
return nil
}
bnWs.subscribe(endpoint, handle)
return nil
}
func (bnWs *BinanceWs) exitHandler(c *WsConn) {
pingTicker := time.NewTicker(1 * time.Minute)
pongTicker := time.NewTicker(30 * time.Second)
defer func() {
pingTicker.Stop()
pongTicker.Stop()
c.CloseWs()
if err := recover(); err != nil {
fmt.Printf("CloseWs, panic: %s\r\n", err)
}
}()
for {
select {
case t := <-pingTicker.C:
c.SendPingMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond)))))
case t := <-pongTicker.C:
c.SendPongMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond)))))
}
}
}
func parseJsonToMap(msg []byte) (map[string]interface{}, error) {
datamap := make(map[string]interface{})
err := sonic.Unmarshal(msg, &datamap)
return datamap, err
}
func handleForceOrder(msg []byte, tradeSet models.TradeSet, callback func(models.ForceOrder, string, string)) error {
datamap, err := parseJsonToMap(msg)
if err != nil {
return fmt.Errorf("json unmarshal error: %v", err)
}
msgType, ok := datamap["e"].(string)
if !ok || msgType != "forceOrder" {
return errors.New("unknown message type")
}
datamapo := datamap["o"].(map[string]interface{})
order := models.ForceOrder{
Side: datamapo["S"].(string),
Symbol: datamapo["s"].(string),
Ordertype: datamapo["o"].(string),
TimeInForce: datamapo["f"].(string),
Num: utility.ToFloat64(datamapo["q"]),
Price: utility.ToFloat64(datamapo["p"]),
AvgPrice: utility.ToFloat64(datamapo["ap"]),
State: datamapo["X"].(string),
CreateTime: timehelper.IntToTime(utility.ToInt64(datamapo["T"])),
}
callback(order, tradeSet.Coin, tradeSet.Currency)
return nil
}
// SubscribeAll 订阅 组合streams的URL格式为 /stream?streams=<streamName1>/<streamName2>/<streamName3>
// 订阅组合streams时事件payload会以这样的格式封装: {"stream":"<streamName>","data":<rawPayload>}
// 单一原始 streams 格式为 /ws/<streamName>
func (bnWs *BinanceWs) SubscribeAll(streamName string) error {
endpoint := bnWs.baseURL + streamName
handle := func(msg []byte) error {
bnWs.allBack(msg)
return nil
}
bnWs.subscribe(endpoint, handle)
return nil
}
// SubscribeAllKline 订阅kline推送 组合streams的URL格式为 /stream?streams=<streamName1>/<streamName2>/<streamName3>
func (bnWs *BinanceWs) SubscribeAllKline(streamName string, tradeSet models.TradeSet) error {
endpoint := bnWs.baseURL + streamName
handle := func(msg []byte) error {
bnWs.allBackKline(msg, tradeSet)
return nil
}
bnWs.subscribe(endpoint, handle)
return nil
}

View File

@ -0,0 +1,442 @@
package excservice
import (
"errors"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"sync"
"time"
"go.uber.org/zap"
"go-admin/pkg/utility"
"github.com/bytedance/sonic"
log "github.com/go-admin-team/go-admin-core/logger"
"github.com/gorilla/websocket"
)
type WsConfig struct {
WsUrl string
ProxyUrl string
ReqHeaders map[string][]string //连接的时候加入的头部信息
HeartbeatIntervalTime time.Duration //
HeartbeatData func() []byte //心跳数据2
IsAutoReconnect bool
ProtoHandleFunc func([]byte) error //协议处理函数
DecompressFunc func([]byte) ([]byte, error) //解压函数
ErrorHandleFunc func(err error)
ConnectSuccessAfterSendMessage func() []byte //for reconnect
IsDump bool
readDeadLineTime time.Duration
reconnectInterval time.Duration
}
var dialer = &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 30 * time.Second,
EnableCompression: true,
}
type WsConn struct {
c *websocket.Conn
WsConfig
writeBufferChan chan []byte
pingMessageBufferChan chan []byte
pongMessageBufferChan chan []byte
closeMessageBufferChan chan []byte
subs [][]byte
close chan bool
reConnectLock *sync.Mutex
}
type WsBuilder struct {
wsConfig *WsConfig
}
func NewWsBuilder() *WsBuilder {
return &WsBuilder{&WsConfig{
ReqHeaders: make(map[string][]string, 1),
reconnectInterval: time.Second * 10,
}}
}
func (b *WsBuilder) WsUrl(wsUrl string) *WsBuilder {
b.wsConfig.WsUrl = wsUrl
return b
}
func (b *WsBuilder) ProxyUrl(proxyUrl string) *WsBuilder {
b.wsConfig.ProxyUrl = proxyUrl
return b
}
func (b *WsBuilder) ReqHeader(key, value string) *WsBuilder {
b.wsConfig.ReqHeaders[key] = append(b.wsConfig.ReqHeaders[key], value)
return b
}
func (b *WsBuilder) AutoReconnect() *WsBuilder {
b.wsConfig.IsAutoReconnect = true
return b
}
func (b *WsBuilder) Dump() *WsBuilder {
b.wsConfig.IsDump = true
return b
}
func (b *WsBuilder) Heartbeat(heartbeat func() []byte, t time.Duration) *WsBuilder {
b.wsConfig.HeartbeatIntervalTime = t
b.wsConfig.HeartbeatData = heartbeat
return b
}
func (b *WsBuilder) ReconnectInterval(t time.Duration) *WsBuilder {
b.wsConfig.reconnectInterval = t
return b
}
func (b *WsBuilder) ProtoHandleFunc(f func([]byte) error) *WsBuilder {
b.wsConfig.ProtoHandleFunc = f
return b
}
func (b *WsBuilder) DecompressFunc(f func([]byte) ([]byte, error)) *WsBuilder {
b.wsConfig.DecompressFunc = f
return b
}
func (b *WsBuilder) ErrorHandleFunc(f func(err error)) *WsBuilder {
b.wsConfig.ErrorHandleFunc = f
return b
}
func (b *WsBuilder) ConnectSuccessAfterSendMessage(msg func() []byte) *WsBuilder {
b.wsConfig.ConnectSuccessAfterSendMessage = msg
return b
}
func (b *WsBuilder) Build() *WsConn {
wsConn := &WsConn{WsConfig: *b.wsConfig}
return wsConn.NewWs()
}
func (ws *WsConn) NewWs() *WsConn {
if ws.HeartbeatIntervalTime == 0 {
ws.readDeadLineTime = time.Minute
} else {
ws.readDeadLineTime = ws.HeartbeatIntervalTime * 2
}
if err := ws.connect(); err != nil {
log.Error("[" + ws.WsUrl + "] " + err.Error())
return nil
}
ws.close = make(chan bool, 1)
ws.pingMessageBufferChan = make(chan []byte, 10)
ws.pongMessageBufferChan = make(chan []byte, 10)
ws.closeMessageBufferChan = make(chan []byte, 10)
ws.writeBufferChan = make(chan []byte, 10)
ws.reConnectLock = new(sync.Mutex)
go ws.writeRequest()
go ws.receiveMessage()
//if ws.ConnectSuccessAfterSendMessage != nil {
// msg := ws.ConnectSuccessAfterSendMessage()
// if msg != nil{
// ws.SendMessage(msg)
// log.ErrorLogMsg("[ws] " + ws.WsUrl + " execute the connect success after send message=" + string(msg))
// }else {
// log.ErrorLogMsg("执行重新连接后执行的登入函数[ws] " + ws.WsUrl + " ,send message=" + string(msg))
// }
//}
return ws
}
func (ws *WsConn) connect() error {
const maxRetries = 5 // 最大重试次数
const retryDelay = 2 * time.Second // 每次重试的延迟时间
var wsConn *websocket.Conn
var resp *http.Response
var err error
// 重试机制
for attempt := 1; attempt <= maxRetries; attempt++ {
if ws.ProxyUrl != "" {
proxy, err := url.Parse(ws.ProxyUrl)
if err == nil {
// log.Info("[ws][%s] proxy url:%s", zap.String("ws.WsUrl", ws.WsUrl))
dialer.Proxy = http.ProxyURL(proxy)
} else {
log.Error("[ws][" + ws.WsUrl + "] parse proxy url [" + ws.ProxyUrl + "] err: " + err.Error())
}
}
// 尝试连接
wsConn, resp, err = dialer.Dial(ws.WsUrl, http.Header(ws.ReqHeaders))
if err != nil {
log.Error(fmt.Sprintf("[ws][%s] Dial attempt %d failed: %s", ws.WsUrl, attempt, err.Error()))
// 如果开启了请求数据转储,打印响应信息
if ws.IsDump && resp != nil {
dumpData, _ := httputil.DumpResponse(resp, true)
log.Info(fmt.Sprintf("[ws][%s] Response dump: %s", ws.WsUrl, string(dumpData)))
}
// 达到最大重试次数,返回错误
if attempt == maxRetries {
return fmt.Errorf("达到最大重试次数 [ws][%s]: %v", ws.WsUrl, err)
}
// 等待一段时间后重试
time.Sleep(retryDelay)
} else {
// 连接成功,退出循环
break
}
}
// 设置读取超时时间
wsConn.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
// 如果开启了请求数据转储,打印响应信息
if ws.IsDump && resp != nil {
dumpData, _ := httputil.DumpResponse(resp, true)
log.Info(fmt.Sprintf("[ws][%s] Response dump: %s", ws.WsUrl, string(dumpData)))
}
// 记录连接成功的日志
log.Info("[ws][" + ws.WsUrl + "] connected")
ws.c = wsConn
return nil
}
func (ws *WsConn) reconnect() {
ws.reConnectLock.Lock()
defer ws.reConnectLock.Unlock()
ws.c.Close() //主动关闭一次
var err error
for retry := 1; retry <= 100; retry++ {
err = ws.connect()
if err != nil {
log.Error("[ws] [" + ws.WsUrl + "] websocket reconnect fail , " + err.Error())
} else {
break
}
time.Sleep(ws.WsConfig.reconnectInterval * time.Duration(retry))
}
if err != nil {
log.Error("[ws] [" + ws.WsUrl + "] retry connect 100 count fail , begin exiting. ")
ws.CloseWs()
if ws.ErrorHandleFunc != nil {
ws.ErrorHandleFunc(errors.New("retry reconnect fail"))
}
} else {
//re subscribe
if ws.ConnectSuccessAfterSendMessage != nil {
msg := ws.ConnectSuccessAfterSendMessage()
if msg != nil {
ws.SendMessage(msg)
//log.ErrorLogMsg("[ws] " + ws.WsUrl + " execute the connect success after send message=" + string(msg))
} else {
log.Error("执行重新连接后执行的登入函数[ws] " + ws.WsUrl + " ,send message=" + string(msg))
}
//ws.SendMessage(msg)
//log.InfoLog("[ws] [" + ws.WsUrl + "] execute the connect success after send message=" + string(msg))
time.Sleep(time.Second) //wait response
}
for _, sub := range ws.subs {
log.Info("[ws] re subscribe: " + string(sub))
ws.SendMessage(sub)
}
}
}
func (ws *WsConn) writeRequest() {
var (
heartTimer *time.Timer
err error
)
if ws.HeartbeatIntervalTime == 0 {
heartTimer = time.NewTimer(time.Hour)
} else {
heartTimer = time.NewTimer(ws.HeartbeatIntervalTime)
}
for {
select {
case <-ws.close:
log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting write message goroutine.")
return
case d := <-ws.writeBufferChan:
err = ws.c.WriteMessage(websocket.TextMessage, d)
case d := <-ws.pingMessageBufferChan:
err = ws.c.WriteMessage(websocket.PingMessage, d)
case d := <-ws.pongMessageBufferChan:
err = ws.c.WriteMessage(websocket.PongMessage, d)
case d := <-ws.closeMessageBufferChan:
err = ws.c.WriteMessage(websocket.CloseMessage, d)
case <-heartTimer.C:
if ws.HeartbeatIntervalTime > 0 {
err = ws.c.WriteMessage(websocket.TextMessage, ws.HeartbeatData())
heartTimer.Reset(ws.HeartbeatIntervalTime)
}
}
if err != nil {
log.Info("[ws][" + ws.WsUrl + "] write message " + err.Error())
//time.Sleep(time.Second)
}
}
}
func (ws *WsConn) Subscribe(subEvent interface{}) error {
data, err := sonic.Marshal(subEvent)
if err != nil {
log.Error("[ws]["+ws.WsUrl+"] json encode error , ", zap.Error(err))
return err
}
//ws.writeBufferChan <- data
ws.SendMessage(data)
ws.subs = append(ws.subs, data)
return nil
}
func (ws *WsConn) SendMessage(msg []byte) {
defer func() {
//打印panic的错误信息
if err := recover(); err != nil { //产生了panic异常
fmt.Printf("SendMessage,panic: %s\r\n", err)
}
}()
ws.writeBufferChan <- msg
}
func (ws *WsConn) SendPingMessage(msg []byte) {
ws.pingMessageBufferChan <- msg
}
func (ws *WsConn) SendPongMessage(msg []byte) {
ws.pongMessageBufferChan <- msg
}
func (ws *WsConn) SendCloseMessage(msg []byte) {
ws.closeMessageBufferChan <- msg
}
func (ws *WsConn) SendJsonMessage(m interface{}) error {
data, err := sonic.Marshal(m)
if err != nil {
return err
}
//ws.writeBufferChan <- data
ws.SendMessage(data)
return nil
}
func (ws *WsConn) receiveMessage() {
//exit
ws.c.SetCloseHandler(func(code int, text string) error {
log.Info("[ws][" + ws.WsUrl + "] websocket exiting [code=" + utility.IntTostring(code) + " , text=" + text + "]")
//ws.CloseWs()
return nil
})
ws.c.SetPongHandler(func(pong string) error {
// log.Info("[" + ws.WsUrl + "] received [pong] " + pong)
ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
return nil
})
ws.c.SetPingHandler(func(ping string) error {
// log.Info("[" + ws.WsUrl + "] received [ping] " + ping)
ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
return nil
})
for {
select {
case <-ws.close:
log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting receive message goroutine.")
return
default:
t, msg, err := ws.c.ReadMessage()
if err != nil {
log.Info("ws.c.ReadMessage[ws][" + ws.WsUrl + "] " + err.Error())
if ws.IsAutoReconnect {
log.Info("[ws][" + ws.WsUrl + "] Unexpected Closed , Begin Retry Connect.")
ws.reconnect()
continue
}
if ws.ErrorHandleFunc != nil {
ws.ErrorHandleFunc(err)
}
return
}
// Log.Debug(string(msg))
ws.c.SetReadDeadline(time.Now().Add(ws.readDeadLineTime))
switch t {
case websocket.TextMessage:
ws.ProtoHandleFunc(msg)
case websocket.BinaryMessage:
if ws.DecompressFunc == nil {
ws.ProtoHandleFunc(msg)
} else {
msg2, err := ws.DecompressFunc(msg)
if err != nil {
log.Error("[ws] decompress error " + ws.WsUrl + err.Error())
} else {
ws.ProtoHandleFunc(msg2)
}
}
// case websocket.CloseMessage:
// ws.CloseWs()
default:
log.Error("[ws][" + ws.WsUrl + "] error websocket message type , content is " + string(msg))
}
}
}
}
func (ws *WsConn) CloseWs() {
defer func() {
//打印panic的错误信息
if err := recover(); err != nil { //产生了panic异常
fmt.Printf("CloseWs,panic: %s\r\n", err)
}
}()
//ws.close <- true
close(ws.close)
close(ws.writeBufferChan)
close(ws.closeMessageBufferChan)
close(ws.pingMessageBufferChan)
close(ws.pongMessageBufferChan)
err := ws.c.Close()
if err != nil {
log.Error("CloseWs[ws]["+ws.WsUrl+"] close websocket error ,", zap.Error(err))
}
}
func (ws *WsConn) clearChannel(c chan struct{}) {
for {
if len(c) > 0 {
<-c
} else {
break
}
}
}