182 lines
5.1 KiB
Go
182 lines
5.1 KiB
Go
package futureservice
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"go-admin/common/global"
|
|
"go-admin/common/helper"
|
|
"go-admin/config"
|
|
"go-admin/pkg/utility"
|
|
"go-admin/services/binanceservice"
|
|
"go-admin/services/excservice"
|
|
"sync"
|
|
|
|
"go-admin/models"
|
|
|
|
log "github.com/go-admin-team/go-admin-core/logger"
|
|
|
|
"github.com/bytedance/sonic"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
baseBinanceWsUrlAll = "wss://fstream.binance.com/stream?streams="
|
|
wsBin *excservice.BinanceWs
|
|
binSetKey = make(map[string]bool)
|
|
binSetKeyMu sync.RWMutex
|
|
|
|
quoteAssetSymbols = []string{"USDT"}
|
|
)
|
|
|
|
type BaseWsDepthStream struct {
|
|
Stream string `json:"stream"` //
|
|
Data models.UFuturesDepthBin `json:"data"` //数据
|
|
}
|
|
|
|
// StartBinanceProWs 启动币安现货市场推送
|
|
// workType: normal-常规任务 trigger-主动触发任务
|
|
func StartBinanceProWs(workType string) {
|
|
if wsBin == nil {
|
|
wsBin = excservice.NewBinanceWs(baseBinanceWsUrlAll, "")
|
|
}
|
|
|
|
if wsBin == nil {
|
|
log.Error("实例化wsBin失败")
|
|
return
|
|
}
|
|
|
|
if wsBin != nil && config.ExtConfig.ProxyUrl != "" {
|
|
wsBin.SetProxyUrl(config.ExtConfig.ProxyUrl)
|
|
}
|
|
wsBin.WorkType = workType
|
|
|
|
wsBin.SetAllCallbacks(HandleWsAll, HandleWsAllKline)
|
|
//订阅所有行情
|
|
subscribeAll(wsBin, "!miniTicker@arr")
|
|
|
|
}
|
|
|
|
func subscribeAll(ws *excservice.BinanceWs, subscribe string) {
|
|
err := ws.SubscribeAll(subscribe)
|
|
|
|
if err != nil {
|
|
log.Error("订阅流失败", zap.String("streams", subscribe), zap.Error(err))
|
|
} else {
|
|
log.Info("发起订阅", subscribe)
|
|
}
|
|
}
|
|
|
|
// HandleWsAll 处理从WebSocket接收到的消息
|
|
func HandleWsAll(msg []byte) {
|
|
|
|
if bytes.Contains(msg, []byte("miniTicker@arr")) {
|
|
handleTickerAllMessage(msg)
|
|
}
|
|
}
|
|
|
|
/*
|
|
根据ws 推送值获取 symbol 和交易对信息
|
|
- @dataMap 数据源 dataMap和symbol二选一
|
|
- @symbol 交易对 dataMap和symbol二选一
|
|
*/
|
|
func getWsRespTradeSet(dataMap map[string]interface{}, symbol string, tradeSet *models.TradeSet) (string, error) {
|
|
if symbol == "" {
|
|
symbol = dataMap["s"].(string)
|
|
|
|
if symbol == "" {
|
|
return symbol, errors.New("交易对为空")
|
|
}
|
|
}
|
|
cacheStr, err := helper.DefaultRedis.GetString(fmt.Sprintf(global.TICKER_FUTURES, global.EXCHANGE_BINANCE, symbol))
|
|
|
|
if err != nil {
|
|
// log.Error("获取缓存失败", symbol, err)
|
|
return symbol, errors.New("获取缓存失败 " + err.Error())
|
|
}
|
|
|
|
err = sonic.Unmarshal([]byte(cacheStr), tradeSet)
|
|
|
|
if err != nil {
|
|
return symbol, errors.New("对象转换失败 " + err.Error())
|
|
}
|
|
|
|
return symbol, nil
|
|
}
|
|
|
|
// handleTickerMessage 处理ticker@all消息
|
|
func handleTickerAllMessage(msg []byte) {
|
|
dataAll := tickerAllMessage{}
|
|
if err := sonic.Unmarshal(msg, &dataAll); err != nil {
|
|
log.Error("解码ticker@all消息失败", zap.Error(err))
|
|
return
|
|
}
|
|
// dataMap, ok := dataAll["data"].([]map[string]interface{})
|
|
if len(dataAll.Data) <= 0 {
|
|
log.Error("ticker消息不包含有效数据字段")
|
|
return
|
|
}
|
|
|
|
trades := make([]models.TradeSet, 0)
|
|
|
|
for _, data := range dataAll.Data {
|
|
symbol := data["s"].(string)
|
|
|
|
if symbol == "" || !utility.HasSuffix(symbol, quoteAssetSymbols) {
|
|
continue
|
|
}
|
|
|
|
tradeSet := models.TradeSet{}
|
|
symbol, err := getWsRespTradeSet(data, "", &tradeSet)
|
|
|
|
if err != nil {
|
|
// log.Debug(symbol, "ticker@all ws处理失败", err)
|
|
continue
|
|
}
|
|
tradeSet.LastPrice = utility.StringFloat64Cut(data["c"].(string), int32(tradeSet.PriceDigit))
|
|
tradeSet.OpenPrice = utility.StrToFloatCut(data["o"].(string), int32(tradeSet.PriceDigit))
|
|
tradeSet.HighPrice = utility.StringFloat64Cut(data["h"].(string), int32(tradeSet.PriceDigit))
|
|
tradeSet.LowPrice = utility.StringFloat64Cut(data["l"].(string), int32(tradeSet.PriceDigit))
|
|
tradeSet.Volume = utility.StringFloat64Cut(data["v"].(string), int32(tradeSet.AmountDigit))
|
|
tradeSet.QuoteVolume = utility.StringFloat64Cut(data["q"].(string), 5)
|
|
trades = append(trades, tradeSet)
|
|
|
|
tradeSetVal, _ := sonic.MarshalString(&tradeSet)
|
|
if tradeSetVal != "" {
|
|
if err := helper.DefaultRedis.SetString(fmt.Sprintf(global.TICKER_FUTURES, global.EXCHANGE_BINANCE, symbol), tradeSetVal); err != nil {
|
|
log.Error(symbol, "ticker@all ws处理失败", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(trades) > 0 {
|
|
for index := range trades {
|
|
//主单触发
|
|
utility.SafeGoParam(binanceservice.JudgeFuturesPrice, trades[index])
|
|
//止损信息
|
|
// utility.SafeGoParam(binanceservice.JudgeFuturesStoplossPrice, trades[index])
|
|
}
|
|
}
|
|
}
|
|
|
|
// HandleWsAllKline 处理kline推送结果
|
|
func HandleWsAllKline(msg []byte, tradeSet models.TradeSet) {
|
|
|
|
}
|
|
|
|
type WskLineData struct {
|
|
Line string `json:"i"` //"1m",K线间隔
|
|
Timestamp int64 `json:"t"` // 这根K线的起始时间
|
|
Open string `json:"o"` // 这根K线期间第一笔成交价
|
|
Close string `json:"c"` // 这根K线期间末一笔成交价
|
|
High string `json:"h"` // 这根K线期间最高成交价
|
|
Low string `json:"l"` // 这根K线期间最低成交价
|
|
Vol string `json:"v"` // 这根K线期间成交量
|
|
QuoteVolume string `json:"q"` // 这根K线期间成交额
|
|
}
|
|
|
|
type tickerAllMessage struct {
|
|
Stream string `json:"stream"`
|
|
Data []map[string]interface{} `json:"data"`
|
|
}
|