176 lines
4.6 KiB
Go
176 lines
4.6 KiB
Go
|
|
package excservice
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"errors"
|
|||
|
|
"fmt"
|
|||
|
|
"strconv"
|
|||
|
|
"time"
|
|||
|
|
|
|||
|
|
"go-admin/models"
|
|||
|
|
"go-admin/pkg/timehelper"
|
|||
|
|
"go-admin/pkg/utility"
|
|||
|
|
|
|||
|
|
"github.com/bytedance/sonic"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
type BinanceWs struct {
|
|||
|
|
baseURL string
|
|||
|
|
combinedBaseURL string
|
|||
|
|
proxyUrl string
|
|||
|
|
WorkType string
|
|||
|
|
wsConns []*WsConn
|
|||
|
|
tickerCallback func(models.Ticker24, string, string)
|
|||
|
|
forceCallback func(models.ForceOrder, string, string)
|
|||
|
|
depthCallback func(models.DepthBin, string, string)
|
|||
|
|
tradeCallback func(models.NewDealPush, string, string)
|
|||
|
|
klineCallback func(models.Kline, int, string, string)
|
|||
|
|
allBack func(msg []byte)
|
|||
|
|
allBackKline func(msg []byte, tradeSet models.TradeSet)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func NewBinanceWs(wsbaseURL, proxyUrl string) *BinanceWs {
|
|||
|
|
return &BinanceWs{
|
|||
|
|
baseURL: wsbaseURL,
|
|||
|
|
combinedBaseURL: "wss://stream.binance.com:9443/stream?streams=",
|
|||
|
|
proxyUrl: proxyUrl,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (bnWs *BinanceWs) SetProxyUrl(proxyUrl string) {
|
|||
|
|
bnWs.proxyUrl = proxyUrl
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (bnWs *BinanceWs) SetBaseUrl(baseURL string) {
|
|||
|
|
bnWs.baseURL = baseURL
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (bnWs *BinanceWs) SetCombinedBaseURL(combinedBaseURL string) {
|
|||
|
|
bnWs.combinedBaseURL = combinedBaseURL
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (bnWs *BinanceWs) SetAllCallbacks(allBack func(msg []byte), allBackKline func(msg []byte, tradeSet models.TradeSet)) {
|
|||
|
|
if bnWs.allBack == nil {
|
|||
|
|
bnWs.allBack = allBack
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if bnWs.allBackKline == nil {
|
|||
|
|
bnWs.allBackKline = allBackKline
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 订阅通用函数
|
|||
|
|
func (bnWs *BinanceWs) subscribe(endpoint string, handle func(msg []byte) error) {
|
|||
|
|
wsConn := NewWsBuilder().
|
|||
|
|
WsUrl(endpoint).
|
|||
|
|
AutoReconnect().
|
|||
|
|
ProtoHandleFunc(handle).
|
|||
|
|
ProxyUrl(bnWs.proxyUrl).
|
|||
|
|
ReconnectInterval(time.Millisecond * 5).
|
|||
|
|
Build()
|
|||
|
|
if wsConn == nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
bnWs.wsConns = append(bnWs.wsConns, wsConn)
|
|||
|
|
go bnWs.exitHandler(wsConn)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (bnWs *BinanceWs) Close() {
|
|||
|
|
for _, con := range bnWs.wsConns {
|
|||
|
|
con.CloseWs()
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (bnWs *BinanceWs) Subscribe(streamName string, tradeSet models.TradeSet, callback func(msg []byte, tradeSet models.TradeSet)) error {
|
|||
|
|
endpoint := bnWs.baseURL + streamName
|
|||
|
|
handle := func(msg []byte) error {
|
|||
|
|
callback(msg, tradeSet)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
bnWs.subscribe(endpoint, handle)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (bnWs *BinanceWs) exitHandler(c *WsConn) {
|
|||
|
|
pingTicker := time.NewTicker(1 * time.Minute)
|
|||
|
|
pongTicker := time.NewTicker(30 * time.Second)
|
|||
|
|
defer func() {
|
|||
|
|
pingTicker.Stop()
|
|||
|
|
pongTicker.Stop()
|
|||
|
|
c.CloseWs()
|
|||
|
|
|
|||
|
|
if err := recover(); err != nil {
|
|||
|
|
fmt.Printf("CloseWs, panic: %s\r\n", err)
|
|||
|
|
}
|
|||
|
|
}()
|
|||
|
|
|
|||
|
|
for {
|
|||
|
|
select {
|
|||
|
|
case t := <-pingTicker.C:
|
|||
|
|
c.SendPingMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond)))))
|
|||
|
|
case t := <-pongTicker.C:
|
|||
|
|
c.SendPongMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond)))))
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func parseJsonToMap(msg []byte) (map[string]interface{}, error) {
|
|||
|
|
datamap := make(map[string]interface{})
|
|||
|
|
err := sonic.Unmarshal(msg, &datamap)
|
|||
|
|
return datamap, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func handleForceOrder(msg []byte, tradeSet models.TradeSet, callback func(models.ForceOrder, string, string)) error {
|
|||
|
|
datamap, err := parseJsonToMap(msg)
|
|||
|
|
if err != nil {
|
|||
|
|
return fmt.Errorf("json unmarshal error: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
msgType, ok := datamap["e"].(string)
|
|||
|
|
if !ok || msgType != "forceOrder" {
|
|||
|
|
return errors.New("unknown message type")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
datamapo := datamap["o"].(map[string]interface{})
|
|||
|
|
order := models.ForceOrder{
|
|||
|
|
Side: datamapo["S"].(string),
|
|||
|
|
Symbol: datamapo["s"].(string),
|
|||
|
|
Ordertype: datamapo["o"].(string),
|
|||
|
|
TimeInForce: datamapo["f"].(string),
|
|||
|
|
Num: utility.ToFloat64(datamapo["q"]),
|
|||
|
|
Price: utility.ToFloat64(datamapo["p"]),
|
|||
|
|
AvgPrice: utility.ToFloat64(datamapo["ap"]),
|
|||
|
|
State: datamapo["X"].(string),
|
|||
|
|
CreateTime: timehelper.IntToTime(utility.ToInt64(datamapo["T"])),
|
|||
|
|
}
|
|||
|
|
callback(order, tradeSet.Coin, tradeSet.Currency)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// SubscribeAll 订阅 组合streams的URL格式为 /stream?streams=<streamName1>/<streamName2>/<streamName3>
|
|||
|
|
// 订阅组合streams时,事件payload会以这样的格式封装: {"stream":"<streamName>","data":<rawPayload>}
|
|||
|
|
// 单一原始 streams 格式为 /ws/<streamName>
|
|||
|
|
func (bnWs *BinanceWs) SubscribeAll(streamName string) error {
|
|||
|
|
endpoint := bnWs.baseURL + streamName
|
|||
|
|
|
|||
|
|
handle := func(msg []byte) error {
|
|||
|
|
bnWs.allBack(msg)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
bnWs.subscribe(endpoint, handle)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// SubscribeAllKline 订阅kline推送 组合streams的URL格式为 /stream?streams=<streamName1>/<streamName2>/<streamName3>
|
|||
|
|
func (bnWs *BinanceWs) SubscribeAllKline(streamName string, tradeSet models.TradeSet) error {
|
|||
|
|
endpoint := bnWs.baseURL + streamName
|
|||
|
|
|
|||
|
|
handle := func(msg []byte) error {
|
|||
|
|
bnWs.allBackKline(msg, tradeSet)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
bnWs.subscribe(endpoint, handle)
|
|||
|
|
return nil
|
|||
|
|
}
|