package futureservice import ( "bytes" "errors" "fmt" "go-admin/common/const/rediskey" "go-admin/common/global" "go-admin/common/helper" "go-admin/config" "go-admin/pkg/utility" "go-admin/services/binanceservice" "go-admin/services/excservice" "sync" "go-admin/models" log "github.com/go-admin-team/go-admin-core/logger" "github.com/shopspring/decimal" "github.com/bytedance/sonic" "go.uber.org/zap" ) var ( baseBinanceWsUrlAll = "wss://fstream.binance.com/stream?streams=" wsBin *excservice.BinanceWs binSetKey = make(map[string]bool) binSetKeyMu sync.RWMutex quoteAssetSymbols = []string{"USDT"} ) type BaseWsDepthStream struct { Stream string `json:"stream"` // Data models.UFuturesDepthBin `json:"data"` //数据 } // StartBinanceProWs 启动币安现货市场推送 // workType: normal-常规任务 trigger-主动触发任务 func StartBinanceProWs(workType string) { if wsBin == nil { wsBin = excservice.NewBinanceWs(baseBinanceWsUrlAll, "") } if wsBin == nil { log.Error("实例化wsBin失败") return } if wsBin != nil && config.ExtConfig.ProxyUrl != "" { wsBin.SetProxyUrl(config.ExtConfig.ProxyUrl) } wsBin.WorkType = workType wsBin.SetAllCallbacks(HandleWsAll, HandleWsAllKline) //订阅所有行情 subscribeAll(wsBin, "!miniTicker@arr") } func subscribeAll(ws *excservice.BinanceWs, subscribe string) { err := ws.SubscribeAll(subscribe) if err != nil { log.Error("订阅流失败", zap.String("streams", subscribe), zap.Error(err)) } else { log.Info("发起订阅", subscribe) } } // HandleWsAll 处理从WebSocket接收到的消息 func HandleWsAll(msg []byte) { if bytes.Contains(msg, []byte("miniTicker@arr")) { handleTickerAllMessage(msg) } } /* 根据ws 推送值获取 symbol 和交易对信息 - @dataMap 数据源 dataMap和symbol二选一 - @symbol 交易对 dataMap和symbol二选一 */ func getWsRespTradeSet(dataMap map[string]interface{}, symbol string, tradeSet *models.TradeSet) (string, error) { if symbol == "" { symbol = dataMap["s"].(string) if symbol == "" { return symbol, errors.New("交易对为空") } } cacheStr, err := helper.DefaultRedis.GetString(fmt.Sprintf(global.TICKER_FUTURES, global.EXCHANGE_BINANCE, symbol)) if err != nil { // log.Error("获取缓存失败", symbol, err) return symbol, errors.New("获取缓存失败 " + err.Error()) } err = sonic.Unmarshal([]byte(cacheStr), tradeSet) if err != nil { return symbol, errors.New("对象转换失败 " + err.Error()) } return symbol, nil } // handleTickerMessage 处理ticker@all消息 func handleTickerAllMessage(msg []byte) { dataAll := tickerAllMessage{} if err := sonic.Unmarshal(msg, &dataAll); err != nil { log.Error("解码ticker@all消息失败", zap.Error(err)) return } // dataMap, ok := dataAll["data"].([]map[string]interface{}) if len(dataAll.Data) <= 0 { log.Error("ticker消息不包含有效数据字段") return } trades := make([]models.TradeSet, 0) for _, data := range dataAll.Data { symbol := data["s"].(string) if symbol == "" || !utility.HasSuffix(symbol, quoteAssetSymbols) { continue } tradeSet := models.TradeSet{} symbol, err := getWsRespTradeSet(data, "", &tradeSet) if err != nil { // log.Debug(symbol, "ticker@all ws处理失败", err) continue } var utcTime int64 var lastPrice decimal.Decimal lastPriceKey := fmt.Sprintf(rediskey.FutureTickerLastPrice, global.EXCHANGE_BINANCE, symbol) if e, ok := data["E"]; ok { utcTime = utility.ToInt64(e) } if e, ok := data["c"]; ok { lastPrice = utility.StrToDecimal(e.(string)).Truncate(int32(tradeSet.PriceDigit)) } tradeSet.LastPrice = lastPrice.String() tradeSet.OpenPrice = utility.StrToFloatCut(data["o"].(string), int32(tradeSet.PriceDigit)) tradeSet.HighPrice = utility.StringFloat64Cut(data["h"].(string), int32(tradeSet.PriceDigit)) tradeSet.LowPrice = utility.StringFloat64Cut(data["l"].(string), int32(tradeSet.PriceDigit)) tradeSet.Volume = utility.StringFloat64Cut(data["v"].(string), int32(tradeSet.AmountDigit)) tradeSet.QuoteVolume = utility.StringFloat64Cut(data["q"].(string), 5) trades = append(trades, tradeSet) tradeSetVal, _ := sonic.MarshalString(&tradeSet) if tradeSetVal != "" { if err := helper.DefaultRedis.SetString(fmt.Sprintf(global.TICKER_FUTURES, global.EXCHANGE_BINANCE, symbol), tradeSetVal); err != nil { log.Error(symbol, "ticker@all ws处理失败", err) } } val, _ := helper.DefaultRedis.GetAllList(rediskey.CacheSymbolLastPrice) if utility.ContainsStr(val, symbol) { //行情存储时间 lastUtc := utcTime - 1000*60*60 content := fmt.Sprintf("%d:%s", utcTime, lastPrice.String()) if _, err := helper.DefaultRedis.RemoveBeforeScore(lastPriceKey, float64(lastUtc)); err != nil { log.Errorf("移除 合约交易对:%s %d之前的最新成交价失败,err:%v", symbol, lastUtc, err) } if err := helper.DefaultRedis.AddSortSet(lastPriceKey, float64(utcTime), content); err != nil { log.Errorf("添加 合约交易对:%s %d之前的最新成交价失败,err:%v", symbol, lastUtc, err) } } } if len(trades) > 0 { for index := range trades { //主单触发 utility.SafeGoParam(binanceservice.JudgeFuturesPrice, trades[index]) //减仓 utility.SafeGoParam(binanceservice.JudgeFuturesReduce, trades[index]) //加仓 utility.SafeGoParam(binanceservice.JudgeFutAddPosition, trades[index]) } } } // HandleWsAllKline 处理kline推送结果 func HandleWsAllKline(msg []byte, tradeSet models.TradeSet) { } type WskLineData struct { Line string `json:"i"` //"1m",K线间隔 Timestamp int64 `json:"t"` // 这根K线的起始时间 Open string `json:"o"` // 这根K线期间第一笔成交价 Close string `json:"c"` // 这根K线期间末一笔成交价 High string `json:"h"` // 这根K线期间最高成交价 Low string `json:"l"` // 这根K线期间最低成交价 Vol string `json:"v"` // 这根K线期间成交量 QuoteVolume string `json:"q"` // 这根K线期间成交额 } type tickerAllMessage struct { Stream string `json:"stream"` Data []map[string]interface{} `json:"data"` }