176 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			176 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								package excservice
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								import (
							 | 
						|||
| 
								 | 
							
									"errors"
							 | 
						|||
| 
								 | 
							
									"fmt"
							 | 
						|||
| 
								 | 
							
									"strconv"
							 | 
						|||
| 
								 | 
							
									"time"
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									"go-admin/models"
							 | 
						|||
| 
								 | 
							
									"go-admin/pkg/timehelper"
							 | 
						|||
| 
								 | 
							
									"go-admin/pkg/utility"
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									"github.com/bytedance/sonic"
							 | 
						|||
| 
								 | 
							
								)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								type BinanceWs struct {
							 | 
						|||
| 
								 | 
							
									baseURL         string
							 | 
						|||
| 
								 | 
							
									combinedBaseURL string
							 | 
						|||
| 
								 | 
							
									proxyUrl        string
							 | 
						|||
| 
								 | 
							
									WorkType        string
							 | 
						|||
| 
								 | 
							
									wsConns         []*WsConn
							 | 
						|||
| 
								 | 
							
									tickerCallback  func(models.Ticker24, string, string)
							 | 
						|||
| 
								 | 
							
									forceCallback   func(models.ForceOrder, string, string)
							 | 
						|||
| 
								 | 
							
									depthCallback   func(models.DepthBin, string, string)
							 | 
						|||
| 
								 | 
							
									tradeCallback   func(models.NewDealPush, string, string)
							 | 
						|||
| 
								 | 
							
									klineCallback   func(models.Kline, int, string, string)
							 | 
						|||
| 
								 | 
							
									allBack         func(msg []byte)
							 | 
						|||
| 
								 | 
							
									allBackKline    func(msg []byte, tradeSet models.TradeSet)
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func NewBinanceWs(wsbaseURL, proxyUrl string) *BinanceWs {
							 | 
						|||
| 
								 | 
							
									return &BinanceWs{
							 | 
						|||
| 
								 | 
							
										baseURL:         wsbaseURL,
							 | 
						|||
| 
								 | 
							
										combinedBaseURL: "wss://stream.binance.com:9443/stream?streams=",
							 | 
						|||
| 
								 | 
							
										proxyUrl:        proxyUrl,
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) SetProxyUrl(proxyUrl string) {
							 | 
						|||
| 
								 | 
							
									bnWs.proxyUrl = proxyUrl
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) SetBaseUrl(baseURL string) {
							 | 
						|||
| 
								 | 
							
									bnWs.baseURL = baseURL
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) SetCombinedBaseURL(combinedBaseURL string) {
							 | 
						|||
| 
								 | 
							
									bnWs.combinedBaseURL = combinedBaseURL
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) SetAllCallbacks(allBack func(msg []byte), allBackKline func(msg []byte, tradeSet models.TradeSet)) {
							 | 
						|||
| 
								 | 
							
									if bnWs.allBack == nil {
							 | 
						|||
| 
								 | 
							
										bnWs.allBack = allBack
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									if bnWs.allBackKline == nil {
							 | 
						|||
| 
								 | 
							
										bnWs.allBackKline = allBackKline
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// 订阅通用函数
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) subscribe(endpoint string, handle func(msg []byte) error) {
							 | 
						|||
| 
								 | 
							
									wsConn := NewWsBuilder().
							 | 
						|||
| 
								 | 
							
										WsUrl(endpoint).
							 | 
						|||
| 
								 | 
							
										AutoReconnect().
							 | 
						|||
| 
								 | 
							
										ProtoHandleFunc(handle).
							 | 
						|||
| 
								 | 
							
										ProxyUrl(bnWs.proxyUrl).
							 | 
						|||
| 
								 | 
							
										ReconnectInterval(time.Millisecond * 5).
							 | 
						|||
| 
								 | 
							
										Build()
							 | 
						|||
| 
								 | 
							
									if wsConn == nil {
							 | 
						|||
| 
								 | 
							
										return
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
									bnWs.wsConns = append(bnWs.wsConns, wsConn)
							 | 
						|||
| 
								 | 
							
									go bnWs.exitHandler(wsConn)
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) Close() {
							 | 
						|||
| 
								 | 
							
									for _, con := range bnWs.wsConns {
							 | 
						|||
| 
								 | 
							
										con.CloseWs()
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) Subscribe(streamName string, tradeSet models.TradeSet, callback func(msg []byte, tradeSet models.TradeSet)) error {
							 | 
						|||
| 
								 | 
							
									endpoint := bnWs.baseURL + streamName
							 | 
						|||
| 
								 | 
							
									handle := func(msg []byte) error {
							 | 
						|||
| 
								 | 
							
										callback(msg, tradeSet)
							 | 
						|||
| 
								 | 
							
										return nil
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
									bnWs.subscribe(endpoint, handle)
							 | 
						|||
| 
								 | 
							
									return nil
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) exitHandler(c *WsConn) {
							 | 
						|||
| 
								 | 
							
									pingTicker := time.NewTicker(1 * time.Minute)
							 | 
						|||
| 
								 | 
							
									pongTicker := time.NewTicker(30 * time.Second)
							 | 
						|||
| 
								 | 
							
									defer func() {
							 | 
						|||
| 
								 | 
							
										pingTicker.Stop()
							 | 
						|||
| 
								 | 
							
										pongTicker.Stop()
							 | 
						|||
| 
								 | 
							
										c.CloseWs()
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
										if err := recover(); err != nil {
							 | 
						|||
| 
								 | 
							
											fmt.Printf("CloseWs, panic: %s\r\n", err)
							 | 
						|||
| 
								 | 
							
										}
							 | 
						|||
| 
								 | 
							
									}()
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									for {
							 | 
						|||
| 
								 | 
							
										select {
							 | 
						|||
| 
								 | 
							
										case t := <-pingTicker.C:
							 | 
						|||
| 
								 | 
							
											c.SendPingMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond)))))
							 | 
						|||
| 
								 | 
							
										case t := <-pongTicker.C:
							 | 
						|||
| 
								 | 
							
											c.SendPongMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond)))))
							 | 
						|||
| 
								 | 
							
										}
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func parseJsonToMap(msg []byte) (map[string]interface{}, error) {
							 | 
						|||
| 
								 | 
							
									datamap := make(map[string]interface{})
							 | 
						|||
| 
								 | 
							
									err := sonic.Unmarshal(msg, &datamap)
							 | 
						|||
| 
								 | 
							
									return datamap, err
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								func handleForceOrder(msg []byte, tradeSet models.TradeSet, callback func(models.ForceOrder, string, string)) error {
							 | 
						|||
| 
								 | 
							
									datamap, err := parseJsonToMap(msg)
							 | 
						|||
| 
								 | 
							
									if err != nil {
							 | 
						|||
| 
								 | 
							
										return fmt.Errorf("json unmarshal error: %v", err)
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									msgType, ok := datamap["e"].(string)
							 | 
						|||
| 
								 | 
							
									if !ok || msgType != "forceOrder" {
							 | 
						|||
| 
								 | 
							
										return errors.New("unknown message type")
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									datamapo := datamap["o"].(map[string]interface{})
							 | 
						|||
| 
								 | 
							
									order := models.ForceOrder{
							 | 
						|||
| 
								 | 
							
										Side:        datamapo["S"].(string),
							 | 
						|||
| 
								 | 
							
										Symbol:      datamapo["s"].(string),
							 | 
						|||
| 
								 | 
							
										Ordertype:   datamapo["o"].(string),
							 | 
						|||
| 
								 | 
							
										TimeInForce: datamapo["f"].(string),
							 | 
						|||
| 
								 | 
							
										Num:         utility.ToFloat64(datamapo["q"]),
							 | 
						|||
| 
								 | 
							
										Price:       utility.ToFloat64(datamapo["p"]),
							 | 
						|||
| 
								 | 
							
										AvgPrice:    utility.ToFloat64(datamapo["ap"]),
							 | 
						|||
| 
								 | 
							
										State:       datamapo["X"].(string),
							 | 
						|||
| 
								 | 
							
										CreateTime:  timehelper.IntToTime(utility.ToInt64(datamapo["T"])),
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
									callback(order, tradeSet.Coin, tradeSet.Currency)
							 | 
						|||
| 
								 | 
							
									return nil
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// SubscribeAll 订阅 组合streams的URL格式为 /stream?streams=<streamName1>/<streamName2>/<streamName3>
							 | 
						|||
| 
								 | 
							
								// 订阅组合streams时,事件payload会以这样的格式封装: {"stream":"<streamName>","data":<rawPayload>}
							 | 
						|||
| 
								 | 
							
								// 单一原始 streams 格式为 /ws/<streamName>
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) SubscribeAll(streamName string) error {
							 | 
						|||
| 
								 | 
							
									endpoint := bnWs.baseURL + streamName
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									handle := func(msg []byte) error {
							 | 
						|||
| 
								 | 
							
										bnWs.allBack(msg)
							 | 
						|||
| 
								 | 
							
										return nil
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									bnWs.subscribe(endpoint, handle)
							 | 
						|||
| 
								 | 
							
									return nil
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// SubscribeAllKline 订阅kline推送 组合streams的URL格式为 /stream?streams=<streamName1>/<streamName2>/<streamName3>
							 | 
						|||
| 
								 | 
							
								func (bnWs *BinanceWs) SubscribeAllKline(streamName string, tradeSet models.TradeSet) error {
							 | 
						|||
| 
								 | 
							
									endpoint := bnWs.baseURL + streamName
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									handle := func(msg []byte) error {
							 | 
						|||
| 
								 | 
							
										bnWs.allBackKline(msg, tradeSet)
							 | 
						|||
| 
								 | 
							
										return nil
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									bnWs.subscribe(endpoint, handle)
							 | 
						|||
| 
								 | 
							
									return nil
							 | 
						|||
| 
								 | 
							
								}
							 |