Files
exchange_go/services/futureservice/binancemarket.go
2025-03-31 15:37:29 +08:00

208 lines
6.0 KiB
Go

package futureservice
import (
"bytes"
"errors"
"fmt"
"go-admin/common/const/rediskey"
"go-admin/common/global"
"go-admin/common/helper"
"go-admin/config"
"go-admin/pkg/utility"
"go-admin/services/binanceservice"
"go-admin/services/excservice"
"sync"
"go-admin/models"
log "github.com/go-admin-team/go-admin-core/logger"
"github.com/shopspring/decimal"
"github.com/bytedance/sonic"
"go.uber.org/zap"
)
var (
baseBinanceWsUrlAll = "wss://fstream.binance.com/stream?streams="
wsBin *excservice.BinanceWs
binSetKey = make(map[string]bool)
binSetKeyMu sync.RWMutex
quoteAssetSymbols = []string{"USDT"}
)
type BaseWsDepthStream struct {
Stream string `json:"stream"` //
Data models.UFuturesDepthBin `json:"data"` //数据
}
// StartBinanceProWs 启动币安现货市场推送
// workType: normal-常规任务 trigger-主动触发任务
func StartBinanceProWs(workType string) {
if wsBin == nil {
wsBin = excservice.NewBinanceWs(baseBinanceWsUrlAll, "")
}
if wsBin == nil {
log.Error("实例化wsBin失败")
return
}
if wsBin != nil && config.ExtConfig.ProxyUrl != "" {
wsBin.SetProxyUrl(config.ExtConfig.ProxyUrl)
}
wsBin.WorkType = workType
wsBin.SetAllCallbacks(HandleWsAll, HandleWsAllKline)
//订阅所有行情
subscribeAll(wsBin, "!miniTicker@arr")
}
func subscribeAll(ws *excservice.BinanceWs, subscribe string) {
err := ws.SubscribeAll(subscribe)
if err != nil {
log.Error("订阅流失败", zap.String("streams", subscribe), zap.Error(err))
} else {
log.Info("发起订阅", subscribe)
}
}
// HandleWsAll 处理从WebSocket接收到的消息
func HandleWsAll(msg []byte) {
if bytes.Contains(msg, []byte("miniTicker@arr")) {
handleTickerAllMessage(msg)
}
}
/*
根据ws 推送值获取 symbol 和交易对信息
- @dataMap 数据源 dataMap和symbol二选一
- @symbol 交易对 dataMap和symbol二选一
*/
func getWsRespTradeSet(dataMap map[string]interface{}, symbol string, tradeSet *models.TradeSet) (string, error) {
if symbol == "" {
symbol = dataMap["s"].(string)
if symbol == "" {
return symbol, errors.New("交易对为空")
}
}
cacheStr, err := helper.DefaultRedis.GetString(fmt.Sprintf(global.TICKER_FUTURES, global.EXCHANGE_BINANCE, symbol))
if err != nil {
// log.Error("获取缓存失败", symbol, err)
return symbol, errors.New("获取缓存失败 " + err.Error())
}
err = sonic.Unmarshal([]byte(cacheStr), tradeSet)
if err != nil {
return symbol, errors.New("对象转换失败 " + err.Error())
}
return symbol, nil
}
// handleTickerMessage 处理ticker@all消息
func handleTickerAllMessage(msg []byte) {
dataAll := tickerAllMessage{}
if err := sonic.Unmarshal(msg, &dataAll); err != nil {
log.Error("解码ticker@all消息失败", zap.Error(err))
return
}
// dataMap, ok := dataAll["data"].([]map[string]interface{})
if len(dataAll.Data) <= 0 {
log.Error("ticker消息不包含有效数据字段")
return
}
trades := make([]models.TradeSet, 0)
for _, data := range dataAll.Data {
symbol := data["s"].(string)
if symbol == "" || !utility.HasSuffix(symbol, quoteAssetSymbols) {
continue
}
tradeSet := models.TradeSet{}
symbol, err := getWsRespTradeSet(data, "", &tradeSet)
if err != nil {
// log.Debug(symbol, "ticker@all ws处理失败", err)
continue
}
var utcTime int64
var lastPrice decimal.Decimal
lastPriceKey := fmt.Sprintf(rediskey.FutureTickerLastPrice, global.EXCHANGE_BINANCE, symbol)
if e, ok := data["E"]; ok {
utcTime = utility.ToInt64(e)
}
if e, ok := data["c"]; ok {
lastPrice = utility.StrToDecimal(e.(string)).Truncate(int32(tradeSet.PriceDigit))
}
tradeSet.LastPrice = lastPrice.String()
tradeSet.OpenPrice = utility.StrToFloatCut(data["o"].(string), int32(tradeSet.PriceDigit))
tradeSet.HighPrice = utility.StringFloat64Cut(data["h"].(string), int32(tradeSet.PriceDigit))
tradeSet.LowPrice = utility.StringFloat64Cut(data["l"].(string), int32(tradeSet.PriceDigit))
tradeSet.Volume = utility.StringFloat64Cut(data["v"].(string), int32(tradeSet.AmountDigit))
tradeSet.QuoteVolume = utility.StringFloat64Cut(data["q"].(string), 5)
trades = append(trades, tradeSet)
tradeSetVal, _ := sonic.MarshalString(&tradeSet)
if tradeSetVal != "" {
if err := helper.DefaultRedis.SetString(fmt.Sprintf(global.TICKER_FUTURES, global.EXCHANGE_BINANCE, symbol), tradeSetVal); err != nil {
log.Error(symbol, "ticker@all ws处理失败", err)
}
}
//行情存储时间
lastUtc := utcTime - 1000*60*60
if _, err := helper.DefaultRedis.RemoveBeforeScore(lastPriceKey, float64(lastUtc)); err != nil {
log.Errorf("移除 合约交易对:%s %d之前的最新成交价失败,err:%v", symbol, lastUtc, err)
}
if err := helper.DefaultRedis.AddSortSet(lastPriceKey, float64(utcTime), lastPrice.String()); err != nil {
log.Errorf("添加 合约交易对:%s %d之前的最新成交价失败,err:%v", symbol, lastUtc, err)
}
}
if len(trades) > 0 {
for index := range trades {
//主单触发
utility.SafeGoParam(binanceservice.JudgeFuturesPrice, trades[index])
//减仓
utility.SafeGoParam(binanceservice.JudgeFuturesReduce, trades[index])
//加仓
utility.SafeGoParam(binanceservice.JudgeFutAddPosition, trades[index])
}
}
}
// HandleWsAllKline 处理kline推送结果
func HandleWsAllKline(msg []byte, tradeSet models.TradeSet) {
}
type WskLineData struct {
Line string `json:"i"` //"1m",K线间隔
Timestamp int64 `json:"t"` // 这根K线的起始时间
Open string `json:"o"` // 这根K线期间第一笔成交价
Close string `json:"c"` // 这根K线期间末一笔成交价
High string `json:"h"` // 这根K线期间最高成交价
Low string `json:"l"` // 这根K线期间最低成交价
Vol string `json:"v"` // 这根K线期间成交量
QuoteVolume string `json:"q"` // 这根K线期间成交额
}
type tickerAllMessage struct {
Stream string `json:"stream"`
Data []map[string]interface{} `json:"data"`
}