From bd7a6d3870c53fd67b2e30b05b6e0c9942801177 Mon Sep 17 00:00:00 2001 From: hucan <951870319@qq.com> Date: Wed, 5 Nov 2025 16:26:21 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=9C=89=E6=9B=B4=E6=96=B0=EF=BC=8C?= =?UTF-8?q?=E8=BF=98=E6=B2=A1=E6=B5=8B=E5=AE=8C=EF=BC=8C=E6=9A=82=E6=97=B6?= =?UTF-8?q?=E5=BD=92=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/admin/models/line_api_user.go | 28 +-- app/admin/service/dto/line_api_user.go | 8 +- app/admin/service/dto/line_pre_order.go | 16 +- common/const/rediskey/redis_key.go | 16 +- config/serverinit/business_init.go | 2 +- config/serverinit/usersubscribeinit.go | 16 +- config/settings.yml | 10 + .../binanceservice/futuresjudgeservice.go | 134 ++++++++++--- services/binanceservice/futuresrest.go | 69 +++++-- services/binanceservice/spotjudgeservice.go | 134 ++++++++----- services/binanceservice/spotreset.go | 94 ++++++++- services/excservice/binancesocketmanager.go | 181 +++++++----------- services/excservice/binancews.go | 33 ++-- services/excservice/websocket.go | 64 ++++--- services/orderservice/order_service.go | 27 ++- 15 files changed, 536 insertions(+), 296 deletions(-) diff --git a/app/admin/models/line_api_user.go b/app/admin/models/line_api_user.go index 3e6a673..c51f30f 100644 --- a/app/admin/models/line_api_user.go +++ b/app/admin/models/line_api_user.go @@ -9,20 +9,20 @@ type LineApiUser struct { ExchangeType string `json:"exchangeType" gorm:"type:varchar(20);comment:交易所类型(字典 exchange_type)"` UserId int64 `json:"userId" gorm:"type:int unsigned;comment:用户id"` - JysId int64 `json:"jysId" gorm:"type:int;comment:关联交易所账号id"` - ApiName string `json:"apiName" gorm:"type:varchar(255);comment:api用户名"` - ApiKey string `json:"apiKey" gorm:"type:varchar(255);comment:apiKey"` - ApiSecret string `json:"apiSecret" gorm:"type:varchar(255);comment:apiSecret"` - Passphrase string `json:"passphrase" gorm:"type:varchar(255);comment:api密码"` - IpAddress string `json:"ipAddress" gorm:"type:varchar(255);comment:代理地址"` - UserPass string `json:"userPass" gorm:"type:varchar(255);comment:代码账号密码"` - AdminId int64 `json:"adminId" gorm:"type:int unsigned;comment:管理员id"` - Affiliation int64 `json:"affiliation" gorm:"type:int;comment:归属:1=现货,2=合约,3=现货合约"` - AdminShow int64 `json:"adminShow" gorm:"type:int;comment:是否超管可见:1=是,0=否"` - Site string `json:"site" gorm:"type:enum('1','2','3');comment:允许下单的方向:1=多;2=空;3=多空"` - Subordinate string `json:"subordinate" gorm:"type:enum('0','1','2');comment:从属关系:0=未绑定关系;1=主账号;2=副帐号"` - GroupId int64 `json:"groupId" gorm:"type:int unsigned;comment:所属组id"` - OpenStatus int64 `json:"openStatus" gorm:"type:int unsigned;comment:开启状态 0=关闭 1=开启"` + // JysId int64 `json:"jysId" gorm:"type:int;comment:关联交易所账号id"` + ApiName string `json:"apiName" gorm:"type:varchar(255);comment:api用户名"` + ApiKey string `json:"apiKey" gorm:"type:varchar(255);comment:apiKey"` + ApiSecret string `json:"apiSecret" gorm:"type:varchar(255);comment:apiSecret"` + Passphrase string `json:"passphrase" gorm:"type:varchar(255);comment:api密码"` + IpAddress string `json:"ipAddress" gorm:"type:varchar(255);comment:代理地址"` + UserPass string `json:"userPass" gorm:"type:varchar(255);comment:代码账号密码"` + // AdminId int64 `json:"adminId" gorm:"type:int unsigned;comment:管理员id"` + Affiliation int64 `json:"affiliation" gorm:"type:int;comment:归属:1=现货,2=合约,3=现货合约"` + AdminShow int64 `json:"adminShow" gorm:"type:int;comment:是否超管可见:1=是,0=否"` + Site string `json:"site" gorm:"type:enum('1','2','3');comment:允许下单的方向:1=多;2=空;3=多空"` + Subordinate string `json:"subordinate" gorm:"type:enum('0','1','2');comment:从属关系:0=未绑定关系;1=主账号;2=副帐号"` + GroupId int64 `json:"groupId" gorm:"type:int unsigned;comment:所属组id"` + OpenStatus int64 `json:"openStatus" gorm:"type:int unsigned;comment:开启状态 0=关闭 1=开启"` SpotLastTime string `json:"spotLastTime" gorm:"-"` //现货websocket最后通信时间 FuturesLastTime string `json:"futuresLastTime" gorm:"-"` //合约websocket最后通信时间 diff --git a/app/admin/service/dto/line_api_user.go b/app/admin/service/dto/line_api_user.go index d710531..0094507 100644 --- a/app/admin/service/dto/line_api_user.go +++ b/app/admin/service/dto/line_api_user.go @@ -66,13 +66,13 @@ func (s *LineApiUserInsertReq) Generate(model *models.LineApiUser) { } model.ExchangeType = s.ExchangeType model.UserId = s.UserId - model.JysId = s.JysId + // model.JysId = s.JysId model.ApiName = s.ApiName model.ApiKey = s.ApiKey model.ApiSecret = s.ApiSecret model.IpAddress = s.IpAddress model.UserPass = s.UserPass - model.AdminId = s.AdminId + // model.AdminId = s.AdminId model.Affiliation = s.Affiliation model.AdminShow = s.AdminShow model.Site = s.Site @@ -112,13 +112,13 @@ func (s *LineApiUserUpdateReq) Generate(model *models.LineApiUser) { } model.ExchangeType = s.ExchangeType model.UserId = s.UserId - model.JysId = s.JysId + // model.JysId = s.JysId model.ApiName = s.ApiName model.ApiKey = s.ApiKey model.ApiSecret = s.ApiSecret model.IpAddress = s.IpAddress model.UserPass = s.UserPass - model.AdminId = s.AdminId + // model.AdminId = s.AdminId model.Affiliation = s.Affiliation model.AdminShow = s.AdminShow model.Site = s.Site diff --git a/app/admin/service/dto/line_pre_order.go b/app/admin/service/dto/line_pre_order.go index 1868464..4e387fd 100644 --- a/app/admin/service/dto/line_pre_order.go +++ b/app/admin/service/dto/line_pre_order.go @@ -300,7 +300,7 @@ func (req LineAddPreOrderReq) Valid() error { } //减仓100% - if req.ReduceNumRatio.Cmp(decimal.NewFromInt(100)) == 0 { + if req.ReduceNumRatio.Cmp(decimal.NewFromInt(100)) == 0 && len(req.Ext) > 0 { return errors.New("减仓数量百分百,后面的节点不需要添加") } @@ -599,6 +599,20 @@ type StopLossRedisList struct { ApiId int `json:"api_id"` } +type TakeProfitRedisList struct { + Id int `json:"id"` + PId int `json:"pid"` //父级id + MainId int `json:"mainId"` //主单id + OrderTye int `json:"orderType"` + SymbolType int `json:"symbolType"` + OrderCategory int `json:"orderCategory"` + Symbol string `json:"symbol"` + Price decimal.Decimal `json:"price"` //触发价(根据主单价格触发) + Site string `json:"site"` + ApiId int `json:"api_id"` + OrderSn string `json:"订单编号" comment:"订单编号"` +} + // ManuallyCover 手动加仓请求体 type ManuallyCover struct { ApiId int `json:"api_id"` //主账号id diff --git a/common/const/rediskey/redis_key.go b/common/const/rediskey/redis_key.go index 2fd94b9..590a299 100644 --- a/common/const/rediskey/redis_key.go +++ b/common/const/rediskey/redis_key.go @@ -66,10 +66,18 @@ const ( //需要清理键值---------BEGIN--------------- - SpotStopLossList = "spot_stoploss_list:%s" //现货止损待触发列表 {交易所类型code} - SpotReduceList = "spot_reduce_list:%s" //现货减仓待触发 {交易所类型code} - FuturesStopLossList = "futures_stoploss_list:%s" //合约止损待触发列表 {交易所类型code} - FuturesReduceList = "futures_reduce_list:%s" //合约减仓待触发 {交易所类型code} + //现货止损待触发列表 {交易所类型code} + SpotStopLossList = "spot_stoploss_list:%s" + //现货止盈待触发列表 {交易所类型code} + SpotTakeList = "spot_takelist:%s" + //现货减仓待触发 {交易所类型code} + SpotReduceList = "spot_reduce_list:%s" + //合约止损待触发列表 {交易所类型code} + FuturesStopLossList = "futures_stoploss_list:%s" + //合约止盈待触发列表 {交易所类型code} + FuturesTakeList = "futures_takelist:%s" + //合约减仓待触发 {交易所类型code} + FuturesReduceList = "futures_reduce_list:%s" //现货加仓待触发 {交易所code} SpotAddPositionList = "spot_add_position_list:%s" //合约加仓待触发 {交易所code} diff --git a/config/serverinit/business_init.go b/config/serverinit/business_init.go index c34f617..a9be9e5 100644 --- a/config/serverinit/business_init.go +++ b/config/serverinit/business_init.go @@ -57,7 +57,7 @@ func BinanceMarketInit() error { FuturesInit() //现货 订阅 - // SpotSubscribeInit("normal") + SpotSubscribeInit("normal") //合约订阅 futureservice.StartBinanceProWs("normal") // log.Info("订阅合约-开始") diff --git a/config/serverinit/usersubscribeinit.go b/config/serverinit/usersubscribeinit.go index c58f569..a10577b 100644 --- a/config/serverinit/usersubscribeinit.go +++ b/config/serverinit/usersubscribeinit.go @@ -47,20 +47,20 @@ func UserSubscribeInit(orm *gorm.DB, ctx context.Context) { //移除连接 for _, item := range deleteKeys { + // 停止并删除现货连接 if wm, ok := excservice.SpotSockets[item]; ok { wm.Stop() - - delete(excservice.FutureSockets, item) - } - - if wm, ok := excservice.SpotSockets[item]; ok { - wm.Stop() - delete(excservice.SpotSockets, item) } + // 停止并删除合约连接 + if wm, ok := excservice.FutureSockets[item]; ok { + wm.Stop() + delete(excservice.FutureSockets, item) + } + if _, err := helper.DefaultRedis.LRem(rediskey.ApiUserDeleteList, item); err != nil { - log.Error("移除 待关闭websocket 失败:", err) + log.Errorf("移除待关闭websocket失败: %v", err) } } diff --git a/config/settings.yml b/config/settings.yml index e666e17..8a8eac8 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -65,6 +65,16 @@ settings: spotRestURL: "https://api.binance.com" futRestURL: "https://fapi.binance.com" + #Bitget配置 + bitgetSet: + restURL: "https://api.bitget.com" + wsURL: "wss://ws.bitget.com/mix/v1/stream" + apiKey: "bg_b205e9eda303b5e579041b2e979bdbe7" + secretKey: "dbdd0f8cdac203da340537de791691e2506a6f25f8d1d11b05afb9c121fa5dd1" + passphrase: "aa123456" + timeoutSecond: 30 + signType: "SHA256" + # 邮箱发送配置 emailConfig: mail_smtp_host: "smtp.163.com" diff --git a/services/binanceservice/futuresjudgeservice.go b/services/binanceservice/futuresjudgeservice.go index f026f85..12e7866 100644 --- a/services/binanceservice/futuresjudgeservice.go +++ b/services/binanceservice/futuresjudgeservice.go @@ -19,6 +19,7 @@ import ( "time" "github.com/bytedance/sonic" + "github.com/go-admin-team/go-admin-core/logger" log "github.com/go-admin-team/go-admin-core/logger" "github.com/shopspring/decimal" "gorm.io/gorm" @@ -35,6 +36,8 @@ func JudgeFuturesPrice(tradeSet models.TradeSet) { return } futApi := FutRestApi{} + orderService := orderservice.OrderService{} + orderService.Orm = db for _, item := range preOrderVal { preOrder := dto.PreOrderRedisList{} @@ -57,14 +60,32 @@ func JudgeFuturesPrice(tradeSet models.TradeSet) { //多 if (strings.ToUpper(preOrder.Site) == "BUY" && orderPrice.Cmp(tradePrice) >= 0) || (strings.ToUpper(preOrder.Site) == "SELL" && orderPrice.Cmp(tradePrice) <= 0) { - futTriggerOrder(db, &preOrder, item, futApi) + apiInfo, _ := GetApiInfo(preOrder.ApiId) + if err := commonservice.JudgeWebsocketTimeout(apiInfo.ApiKey, 2); err != nil { + log.Errorf("合约行情订阅超时,apiKey:%s err:%v", apiInfo.ApiKey, err) + + if err1 := orderService.ErrorTrigger(preOrder.Id, 2, + global.EXCHANGE_BINANCE, item, + fmt.Sprintf("行情触发失败,err:%v", err)); err1 != nil { + log.Error("触发失败", err1) + } + continue + } else { + futTriggerOrder(db, &preOrder, item, futApi) + } } } } -// 分布式锁下单 +// 分布式锁下单(合约预设主单触发) +// 参数说明: +// - db: 数据库连接,用于查询与更新预下单状态 +// - v: 预下单信息(包含价格、数量、订单号等) +// - item: 从 Redis 列表读取的原始字符串,用于幂等删除 +// - futApi: 合约下单 API 封装 +// 逻辑:在 FutTrigger 锁下校验订单有效性,执行下单并更新状态,最后用原始 item 删除缓存,避免序列化差异导致删除失败。 func futTriggerOrder(db *gorm.DB, v *dto.PreOrderRedisList, item string, futApi FutRestApi) { - lock := helper.NewRedisLock(fmt.Sprintf(rediskey.SpotTrigger, v.ApiId, v.Symbol), 200, 5, 100*time.Millisecond) + lock := helper.NewRedisLock(fmt.Sprintf(rediskey.FutTrigger, v.ApiId, v.Symbol), 200, 5, 100*time.Millisecond) if ok, err := lock.AcquireWait(context.Background()); err != nil { log.Debug("获取锁失败", err) @@ -110,29 +131,21 @@ func futTriggerOrder(db *gorm.DB, v *dto.PreOrderRedisList, item string, futApi Quantity: num, NewClientOrderId: v.OrderSn, } - preOrderVal, _ := sonic.MarshalString(&v) - if err := futApi.OrderPlaceLoop(db, params, 3); err != nil { log.Error("下单失败", v.Symbol, " err:", err) - err := db.Model(&DbModels.LinePreOrder{}).Where("id =? and status='0'", preOrder.Id).Updates(map[string]interface{}{"status": "2", "desc": err.Error()}).Error + err = db.Model(&DbModels.LinePreOrder{}).Where("id =? and status='0'", preOrder.Id).Updates(map[string]interface{}{"status": "2", "desc": err.Error()}).Error if err != nil { log.Error("更新预下单状态失败") } - - if preOrderVal != "" { - if _, err := helper.DefaultRedis.LRem(key, preOrderVal); err != nil { - log.Error("删除redis 预下单失败:", err) - } + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Error("删除redis 预下单失败:", err) } return } - - if preOrderVal != "" { - if _, err := helper.DefaultRedis.LRem(key, preOrderVal); err != nil { - log.Error("删除redis 预下单失败:", err) - } + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Error("删除redis 预下单失败:", err) } if err := db.Model(&DbModels.LinePreOrder{}).Where("id =? ", preOrder.Id).Updates(map[string]interface{}{"trigger_time": time.Now()}).Error; err != nil { @@ -303,7 +316,7 @@ func FuturesReduceTrigger(db *gorm.DB, reduceOrder positiondto.ReduceListItem, f apiInfo, _ := GetApiInfo(reduceOrder.ApiId) if apiInfo.Id == 0 { - log.Error("现货减仓 查询api用户不存在") + log.Error("合约减仓 查询api用户不存在") return false } for _, takeOrder := range takeOrders { @@ -399,6 +412,12 @@ func JudgeFutAddPosition(trade models.TradeSet) { } // 合约加仓触发 +// 参数说明: +// - db: 数据库连接 +// - v: 加仓触发信息 +// - item: Redis 列表原始条目,用于精确删除 +// - futApi: 合约下单 API 封装 +// 逻辑:在 FutTrigger 锁下校验缓存与订单有效性,按系统加仓溢价调整价格,触发限价下单并删除原始缓存条目。 func FutAddPositionTrigger(db *gorm.DB, v *positiondto.AddPositionList, item string, futApi FutRestApi) { lock := helper.NewRedisLock(fmt.Sprintf(rediskey.FutTrigger, v.ApiId, v.Symbol), 20, 5, 100*time.Millisecond) @@ -452,29 +471,21 @@ func FutAddPositionTrigger(db *gorm.DB, v *positiondto.AddPositionList, item str Quantity: num.Truncate(int32(tradeSet.AmountDigit)), NewClientOrderId: v.OrderSn, } - preOrderVal, _ := sonic.MarshalString(&v) - if err := futApi.OrderPlaceLoop(db, params, 3); err != nil { log.Error("下单失败", v.Symbol, " err:", err) - err := db.Model(&DbModels.LinePreOrder{}).Where("id =? AND status =0", preOrder.Id).Updates(map[string]interface{}{"status": "2", "desc": err.Error()}).Error + err = db.Model(&DbModels.LinePreOrder{}).Where("id =? AND status =0", preOrder.Id).Updates(map[string]interface{}{"status": "2", "desc": err.Error()}).Error if err != nil { log.Error("下单失败后修改订单失败") } - - if preOrderVal != "" { - if _, err := helper.DefaultRedis.LRem(key, preOrderVal); err != nil { - log.Error("删除redis 预下单失败:", err) - } + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Error("删除redis 预下单失败:", err) } return } - - if preOrderVal != "" { - if _, err := helper.DefaultRedis.LRem(key, preOrderVal); err != nil { - log.Error("删除redis 预下单失败:", err) - } + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Error("删除redis 预下单失败:", err) } if err := db.Model(&DbModels.LinePreOrder{}).Where("id =? ", preOrder.Id).Updates(map[string]interface{}{"trigger_time": time.Now()}).Error; err != nil { @@ -490,3 +501,66 @@ func FutAddPositionTrigger(db *gorm.DB, v *positiondto.AddPositionList, item str return } } + +// 触发止盈单 +// Deprecated 暂时不用了 +func FuturesTakeTrigger(db *gorm.DB, spotApi *SpotRestApi, takeOrder dto.TakeProfitRedisList, key, item string, futApi FutRestApi) { + // tradeSet, err := cacheservice.GetTradeSet(global.EXCHANGE_BINANCE, takeOrder.Symbol, 2) + + // if err != nil { + // logger.Errorf("触发止盈单 查询交易对失败 交易对:%s ordersn:%s err:%v", takeOrder.Symbol, takeOrder.OrderSn, err) + // } + lock := helper.NewRedisLock(fmt.Sprintf(rediskey.FutTrigger, takeOrder.ApiId, takeOrder.Symbol), 20, 5, 100*time.Millisecond) + + if ok, err := lock.AcquireWait(context.Background()); err != nil { + log.Error("获取锁失败", err) + return + } else if ok { + defer lock.Release() + + hasrecord, _ := helper.DefaultRedis.IsElementInList(key, item) + + if !hasrecord { + log.Debug("止损缓存中不存在", item) + return + } + + apiInfo, _ := GetApiInfo(takeOrder.ApiId) + + if apiInfo.Id == 0 { + log.Error("现货止盈 查询api用户不存在") + return + } + + // price := takeOrder.Price.Mul(decimal.NewFromInt(1).Sub(setting.StopLossPremium.Div(decimal.NewFromInt(100)))).Truncate(int32(tradeSet.PriceDigit)) + // num := utility.StrToDecimal(stopOrder.Num).Truncate(int32(tradeSet.AmountDigit)) + + params := OrderPlacementService{ + ApiId: takeOrder.ApiId, + Side: takeOrder.Site, + Type: "LIMIT", + TimeInForce: "GTC", + Symbol: takeOrder.Symbol, + // Price: price, + // Quantity: num, + NewClientOrderId: takeOrder.OrderSn, + } + + if err := spotApi.OrderPlaceLoop(db, params, 3); err != nil { + log.Errorf("现货止盈挂单失败 id:%s err:%v", takeOrder.Id, err) + } else { + if err := db.Model(&DbModels.LinePreOrder{}). + Where("id = ? ", takeOrder.Id). + Updates(map[string]interface{}{"trigger_time": time.Now()}).Error; err != nil { + log.Errorf("现货止盈更新状态失败 id:%s err:%v", takeOrder.Id, err) + } + } + + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Errorf("现货止盈 删除缓存失败 id:%v err:%v", takeOrder.Id, err) + } + + } else { + logger.Errorf("触发止盈单 不存在待触发主单 %s", item) + } +} diff --git a/services/binanceservice/futuresrest.go b/services/binanceservice/futuresrest.go index a1b02a5..73ce5be 100644 --- a/services/binanceservice/futuresrest.go +++ b/services/binanceservice/futuresrest.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "go-admin/app/admin/models" + DbModels "go-admin/app/admin/models" "go-admin/app/admin/service/dto" "go-admin/common/const/rediskey" @@ -20,7 +20,6 @@ import ( "github.com/bytedance/sonic" "github.com/go-admin-team/go-admin-core/logger" - log "github.com/go-admin-team/go-admin-core/logger" "github.com/shopspring/decimal" "gorm.io/gorm" ) @@ -137,7 +136,7 @@ func handleReduceFilled(db *gorm.DB, preOrder *DbModels.LinePreOrder) { //修改减仓单减仓策略状态 ReduceCallBack(db, preOrder) - orderExt := models.LinePreOrderExt{} + orderExt := DbModels.LinePreOrderExt{} //减仓策略单获取主减仓单的拓展信息 if preOrder.ReduceOrderId > 0 { db.Model(&orderExt).Where("order_id =?", preOrder.ReduceOrderId).First(&orderExt) @@ -162,7 +161,7 @@ func handleReduceFilled(db *gorm.DB, preOrder *DbModels.LinePreOrder) { lock := helper.NewRedisLock(fmt.Sprintf(rediskey.FutReducecCallback, preOrder.ApiId, preOrder.Symbol), 120, 20, 100*time.Millisecond) if ok, err := lock.AcquireWait(context.Background()); err != nil { - log.Error("获取锁失败", err) + logger.Error("获取锁失败", err) return } else if ok { defer lock.Release() @@ -183,8 +182,8 @@ func handleReduceFilled(db *gorm.DB, preOrder *DbModels.LinePreOrder) { // 减仓处理止盈止损 func FutTakeProfit(db *gorm.DB, preOrder *DbModels.LinePreOrder, apiUserInfo DbModels.LineApiUser, tradeSet models2.TradeSet, positionData positiondto.PositionDto, orderExt DbModels.LinePreOrderExt, manualTakeRatio, manualStopRatio decimal.Decimal) bool { - orders := make([]models.LinePreOrder, 0) - if err := db.Model(&models.LinePreOrder{}).Where("pid =? AND order_type IN (1,2) AND status = 0", preOrder.Id).Find(&orders).Error; err != nil { + orders := make([]DbModels.LinePreOrder, 0) + if err := db.Model(&DbModels.LinePreOrder{}).Where("pid =? AND order_type IN (1,2) AND status = 0", preOrder.Id).Find(&orders).Error; err != nil { logger.Errorf("handleMainReduceFilled 获取待触发订单失败,订单号:%s", preOrder.OrderSn) return true } @@ -254,12 +253,12 @@ func nextFuturesReduceTrigger(db *gorm.DB, mainId int, totalNum decimal.Decimal, nextOrder := DbModels.LinePreOrder{} nextExt := DbModels.LinePreOrderExt{} - if err := db.Model(&models.LinePreOrder{}).Where("main_id =? AND order_type =4 AND status=0 AND reduce_order_id=0", mainId).Order("rate asc").First(&nextOrder).Error; err != nil { + if err := db.Model(&DbModels.LinePreOrder{}).Where("main_id =? AND order_type =4 AND status=0 AND reduce_order_id=0", mainId).Order("rate asc").First(&nextOrder).Error; err != nil { logger.Errorf("获取下一个单失败 err:%v", err) return } - if err := db.Model(&models.LinePreOrderExt{}).Where("order_id =? and add_type =2", nextOrder.Id).First(&nextExt).Error; err != nil { + if err := db.Model(&DbModels.LinePreOrderExt{}).Where("order_id =? and add_type =2", nextOrder.Id).First(&nextExt).Error; err != nil { logger.Errorf("获取下一个单失败 err:%v", err) } @@ -482,7 +481,7 @@ func removeFutLossAndAddPosition(mainId int, orderSn string) { } // 处理主单成交,处理止盈、止损、减仓订单 -func handleFutMainOrderFilled(db *gorm.DB, preOrder *models.LinePreOrder, extOrderId int, first bool) { +func handleFutMainOrderFilled(db *gorm.DB, preOrder *DbModels.LinePreOrder, extOrderId int, first bool) { // 获取交易对配置和API信息 tradeSet, err := cacheservice.GetTradeSet(global.EXCHANGE_BINANCE, preOrder.Symbol, 2) mainId := preOrder.Id @@ -503,7 +502,7 @@ func handleFutMainOrderFilled(db *gorm.DB, preOrder *models.LinePreOrder, extOrd // 处理主单加仓 if preOrder.OrderCategory == 3 { - if err := handleMainOrderAddPosition(db, preOrder); err != nil { + if err = handleMainOrderAddPosition(db, preOrder); err != nil { logger.Errorf("处理主单加仓失败, 主单号:%s, 错误信息: %v", preOrder.MainId, err) return } @@ -557,7 +556,7 @@ func handleFutMainOrderFilled(db *gorm.DB, preOrder *models.LinePreOrder, extOrd // 获取和保存持仓数据 positionData := savePosition(db, preOrder) - orderExt := models.LinePreOrderExt{} + orderExt := DbModels.LinePreOrderExt{} db.Model(&orderExt).Where("order_id =?", extOrderId).First(&orderExt) totalNum := getFuturesPositionAvailableQuantity(db, apiInfo, preOrder, tradeSet).Truncate(int32(tradeSet.AmountDigit)) price := utility.StrToDecimal(preOrder.Price).Truncate(int32(tradeSet.PriceDigit)) @@ -616,7 +615,7 @@ func handleFutMainOrderFilled(db *gorm.DB, preOrder *models.LinePreOrder, extOrd } // 处理主单加仓 -func handleMainOrderAddPosition(db *gorm.DB, preOrder *models.LinePreOrder) error { +func handleMainOrderAddPosition(db *gorm.DB, preOrder *DbModels.LinePreOrder) error { // 更新加仓状态 if err := db.Model(&DbModels.LinePreOrderStatus{}). Where("order_id = ?", preOrder.MainId). @@ -630,7 +629,7 @@ func handleMainOrderAddPosition(db *gorm.DB, preOrder *models.LinePreOrder) erro // 取消主单相关订单 // changeMainOrderStatus 是否修改主单状态 -func cancelPositionOtherOrders(apiUserInfo DbModels.LineApiUser, db *gorm.DB, preOrder *models.LinePreOrder, changeMainOrderStatus bool) error { +func cancelPositionOtherOrders(apiUserInfo DbModels.LineApiUser, db *gorm.DB, preOrder *DbModels.LinePreOrder, changeMainOrderStatus bool) error { mainOrders, err := getOpenPositionMainOrderId(db, preOrder.Id, preOrder.ApiId, preOrder.SymbolType, preOrder.ExchangeType, preOrder.Symbol, preOrder.Site) if err != nil { return err @@ -689,8 +688,8 @@ func CancelMainOrders(mainIds []int, db *gorm.DB, apiUserInfo DbModels.LineApiUs } // 获取止盈止损订单 -func getStopOrders(db *gorm.DB, preOrder *models.LinePreOrder) ([]models.LinePreOrder, error) { - var orders []models.LinePreOrder +func getStopOrders(db *gorm.DB, preOrder *DbModels.LinePreOrder) ([]DbModels.LinePreOrder, error) { + var orders []DbModels.LinePreOrder if err := db.Model(&DbModels.LinePreOrder{}). Where("pid = ? AND order_type > 0 AND status = '0' ", preOrder.Id). Find(&orders).Error; err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { @@ -700,7 +699,7 @@ func getStopOrders(db *gorm.DB, preOrder *models.LinePreOrder) ([]models.LinePre } // 更新订单数量 -func updateOrderQuantity(db *gorm.DB, order models.LinePreOrder, preOrder *models.LinePreOrder, ext *models.LinePreOrderExt, num decimal.Decimal, first bool, tradeSet models2.TradeSet) decimal.Decimal { +func updateOrderQuantity(db *gorm.DB, order DbModels.LinePreOrder, preOrder *DbModels.LinePreOrder, ext *DbModels.LinePreOrderExt, num decimal.Decimal, first bool, tradeSet models2.TradeSet) decimal.Decimal { // 处理减仓比例 // if order.OrderType == 4 && ext.ReduceNumRatio.Cmp(decimal.Zero) > 0 { // // 计算减仓数量 @@ -755,7 +754,8 @@ func processFutReduceOrder(order DbModels.LinePreOrder, price, num decimal.Decim } // 处理止盈订单 -func processFutTakeProfitOrder(db *gorm.DB, futApi FutRestApi, order models.LinePreOrder, num decimal.Decimal) { +// Deprecated 请使用 processFutTakeProfitOrderNew +func processFutTakeProfitOrder(db *gorm.DB, futApi FutRestApi, order DbModels.LinePreOrder, num decimal.Decimal) { price, _ := decimal.NewFromString(order.Price) tradeSet, _ := cacheservice.GetTradeSet(global.EXCHANGE_BINANCE, order.Symbol, 2) @@ -774,7 +774,7 @@ func processFutTakeProfitOrder(db *gorm.DB, futApi FutRestApi, order models.Line if err != nil { logger.Error("合约止盈下单失败:", order.OrderSn, " err:", err) - if err := db.Model(&DbModels.LinePreOrder{}).Where("id = ?", order.Id). + if err = db.Model(&DbModels.LinePreOrder{}).Where("id = ?", order.Id). Updates(map[string]interface{}{"status": "2", "desc": err.Error(), "num": params.Quantity, "rate": order.Rate, "price": order.Price}).Error; err != nil { logger.Error("合约止盈下单失败,更新状态失败:", order.OrderSn, " err:", err) } @@ -791,9 +791,38 @@ func processFutTakeProfitOrder(db *gorm.DB, futApi FutRestApi, order models.Line } } +// 写入待触发合约止盈单 +func processFutTakeProfitOrderNew(db *gorm.DB, futApi FutRestApi, order DbModels.LinePreOrder, num decimal.Decimal) { + price, _ := decimal.NewFromString(order.Price) + + takeOrder := dto.TakeProfitRedisList{ + Id: order.Id, + ApiId: order.ApiId, + PId: order.Pid, + MainId: order.MainId, + Price: price, + Site: order.Site, + Symbol: order.Symbol, + SymbolType: order.SymbolType, + OrderCategory: order.OrderCategory, + OrderSn: order.OrderSn, + } + + key := fmt.Sprintf(rediskey.FuturesTakeList, global.EXCHANGE_BINANCE) + val, err := sonic.MarshalString(&takeOrder) + if err != nil { + logger.Errorf("序列化失败 err:%v", err) + return + } + if err := helper.DefaultRedis.RPushList(key, val); err != nil { + logger.Errorf("合约止盈单写入缓存失败 err:%v", err) + return + } +} + // 处理止损订单 -// order 止损单 -func processFutStopLossOrder(db *gorm.DB, order models.LinePreOrder, price, num decimal.Decimal) error { +// Deprecated 请使用 processFutStopLossOrderNew +func processFutStopLossOrder(db *gorm.DB, order DbModels.LinePreOrder, price, num decimal.Decimal) error { tradeSet, _ := cacheservice.GetTradeSet(global.EXCHANGE_BINANCE, order.Symbol, 2) params := FutOrderPlace{ diff --git a/services/binanceservice/spotjudgeservice.go b/services/binanceservice/spotjudgeservice.go index bd685fe..98b5359 100644 --- a/services/binanceservice/spotjudgeservice.go +++ b/services/binanceservice/spotjudgeservice.go @@ -56,9 +56,11 @@ func JudgeSpotPrice(trade models.TradeSet) { apiInfo, _ := GetApiInfo(preOrder.ApiId) if err := commonservice.JudgeWebsocketTimeout(apiInfo.ApiKey, 1); err != nil { - log.Errorf("现货行情订阅超时,apiKey:%s err:%v", err) + log.Errorf("现货行情订阅超时,apiKey:%s err:%v", apiInfo.ApiKey, err) - if err1 := orderService.ErrorTrigger(&preOrder, fmt.Sprintf("行情触发失败,err:%v", err)); err1 != nil { + if err1 := orderService.ErrorTrigger(preOrder.Id, 1, + global.EXCHANGE_BINANCE, item, + fmt.Sprintf("行情触发失败,err:%v", err)); err1 != nil { log.Error("触发失败", err1) } continue @@ -70,9 +72,8 @@ func JudgeSpotPrice(trade models.TradeSet) { } } -// 分布式锁下单 -// v 预下单信息 -// item 预下单源文本 +// 分布式锁下单(现货预设主单触发) +// v: 预下单信息;item: Redis 原始条目(用于准确删除) func SpotOrderLock(db *gorm.DB, v *dto.PreOrderRedisList, item string, spotApi SpotRestApi) { lock := helper.NewRedisLock(fmt.Sprintf(rediskey.SpotTrigger, v.ApiId, v.Symbol), 20, 5, 100*time.Millisecond) @@ -114,29 +115,21 @@ func SpotOrderLock(db *gorm.DB, v *dto.PreOrderRedisList, item string, spotApi S Quantity: num, NewClientOrderId: v.OrderSn, } - preOrderVal, _ := sonic.MarshalString(&v) - if err := spotApi.OrderPlaceLoop(db, params, 3); err != nil { log.Error("下单失败", v.Symbol, " err:", err) - err := db.Model(&DbModels.LinePreOrder{}).Where("id =? AND status =0", preOrder.Id).Updates(map[string]interface{}{"status": "2", "desc": err.Error()}).Error + err = db.Model(&DbModels.LinePreOrder{}).Where("id =? AND status =0", preOrder.Id).Updates(map[string]interface{}{"status": "2", "desc": err.Error()}).Error if err != nil { log.Error("下单失败后修改订单失败") } - - if preOrderVal != "" { - if _, err := helper.DefaultRedis.LRem(key, preOrderVal); err != nil { - log.Error("删除redis 预下单失败:", err) - } + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Error("删除redis 预下单失败:", err) } return } - - if preOrderVal != "" { - if _, err := helper.DefaultRedis.LRem(key, preOrderVal); err != nil { - log.Error("删除redis 预下单失败:", err) - } + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Error("删除redis 预下单失败:", err) } if err := db.Model(&DbModels.LinePreOrder{}).Where("id =? ", preOrder.Id).Updates(map[string]interface{}{"trigger_time": time.Now()}).Error; err != nil { @@ -214,7 +207,7 @@ func SpotStopLossTrigger(db *gorm.DB, stopOrder dto.StopLossRedisList, spotApi S hasrecord, _ := helper.DefaultRedis.IsElementInList(key, item) if !hasrecord { - log.Debug("减仓缓存中不存在", item) + log.Debug("止损缓存中不存在", item) return } @@ -225,17 +218,8 @@ func SpotStopLossTrigger(db *gorm.DB, stopOrder dto.StopLossRedisList, spotApi S return } - var err error - for x := 1; x <= 4; x++ { - err = spotApi.CancelOpenOrderByOrderSn(apiInfo, takeOrder.Symbol, takeOrder.OrderSn) - - if err == nil || strings.Contains(err.Error(), "该交易对没有订单") { - err = nil - break - } - } - - if err != nil { + // 使用统一的撤单重试封装,确保行为一致且高效 + if err := CancelOpenOrderByOrderSnLoop(apiInfo, takeOrder.Symbol, takeOrder.OrderSn); err != nil { log.Error("现货止损撤单失败", err) return } @@ -265,11 +249,9 @@ func SpotStopLossTrigger(db *gorm.DB, stopOrder dto.StopLossRedisList, spotApi S if _, err := helper.DefaultRedis.LRem(key, item); err != nil { log.Errorf("现货止损 删除缓存失败 id:%v err:%v", stopOrder.Id, err) } - } else { log.Error("获取锁失败") } - } // 判断是否触发现货减仓 @@ -310,7 +292,7 @@ func JudgeSpotReduce(trade models.TradeSet) { return } else if ok { defer lock.Release() - hasrecord, _ := helper.DefaultRedis.HExists(reduceReduceListKey, utility.IntTostring(reduceOrderStrategy.OrderId), item) + hasrecord, _ := helper.DefaultRedis.HExists(reduceReduceListKey, utility.IntToString(reduceOrderStrategy.OrderId), item) if !hasrecord { log.Debug("减仓缓存中不存在", item) @@ -515,6 +497,8 @@ func JudgeSpotAddPosition(trade models.TradeSet) { } } +// 现货加仓触发 +// v: 加仓触发信息;item: Redis 原始条目(用于准确删除) func SpotAddPositionTrigger(db *gorm.DB, v *positiondto.AddPositionList, item string, spotApi SpotRestApi) { lock := helper.NewRedisLock(fmt.Sprintf(rediskey.SpotTrigger, v.ApiId, v.Symbol), 20, 5, 100*time.Millisecond) @@ -569,8 +553,6 @@ func SpotAddPositionTrigger(db *gorm.DB, v *positiondto.AddPositionList, item st Quantity: num.Truncate(int32(tradeSet.AmountDigit)), NewClientOrderId: v.OrderSn, } - preOrderVal, _ := sonic.MarshalString(&v) - if err := spotApi.OrderPlaceLoop(db, params, 3); err != nil { log.Error("下单失败", v.Symbol, " err:", err) err := db.Model(&DbModels.LinePreOrder{}).Where("id =? AND status =0", preOrder.Id).Updates(map[string]interface{}{"status": "2", "desc": err.Error()}).Error @@ -578,20 +560,14 @@ func SpotAddPositionTrigger(db *gorm.DB, v *positiondto.AddPositionList, item st if err != nil { log.Error("下单失败后修改订单失败") } - - if preOrderVal != "" { - if _, err := helper.DefaultRedis.LRem(key, preOrderVal); err != nil { - log.Error("删除redis 预下单失败:", err) - } + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Error("删除redis 预下单失败:", err) } return } - - if preOrderVal != "" { - if _, err := helper.DefaultRedis.LRem(key, preOrderVal); err != nil { - log.Error("删除redis 预下单失败:", err) - } + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Error("删除redis 预下单失败:", err) } if err := db.Model(&DbModels.LinePreOrder{}).Where("id =?", preOrder.Id).Updates(map[string]interface{}{"trigger_time": time.Now()}).Error; err != nil { @@ -607,3 +583,71 @@ func SpotAddPositionTrigger(db *gorm.DB, v *positiondto.AddPositionList, item st return } } + +// 触发现货止盈 +func SpotTakeProfitTrigger(db *gorm.DB, takeOrder dto.TakeProfitRedisList, spotApi SpotRestApi, setting DbModels.LineSystemSetting, tradeSet models.TradeSet, key string, item string) { + lock := helper.NewRedisLock(fmt.Sprintf(rediskey.SpotTrigger, takeOrder.ApiId, takeOrder.Symbol), 20, 5, 100*time.Millisecond) + + if ok, err := lock.AcquireWait(context.Background()); err != nil { + log.Error("获取锁失败", err) + return + } else if ok { + defer lock.Release() + stopOrder := DbModels.LinePreOrder{} + if err := db.Model(&DbModels.LinePreOrder{}). + Where("pid =? AND order_type =2", takeOrder.PId). + Find(&stopOrder).Error; err != nil { + log.Error("查询止盈单失败") + return + } + + hasrecord, _ := helper.DefaultRedis.IsElementInList(key, item) + + if !hasrecord { + log.Debug("止损缓存中不存在", item) + return + } + + apiInfo, _ := GetApiInfo(takeOrder.ApiId) + + if apiInfo.Id == 0 { + log.Error("现货止盈 查询api用户不存在") + return + } + + // 使用统一的撤单重试封装,确保行为一致且高效 + if err := CancelOpenOrderByOrderSnLoop(apiInfo, stopOrder.Symbol, stopOrder.OrderSn); err != nil { + log.Error("现货止盈撤单失败", err) + return + } + price := takeOrder.Price.Mul(decimal.NewFromInt(1).Sub(setting.StopLossPremium.Div(decimal.NewFromInt(100)))).Truncate(int32(tradeSet.PriceDigit)) + num := utility.StrToDecimal(stopOrder.Num).Truncate(int32(tradeSet.AmountDigit)) + + params := OrderPlacementService{ + ApiId: takeOrder.ApiId, + Side: takeOrder.Site, + Type: "TAKE_PROFIT_Market", + TimeInForce: "GTC", + Symbol: takeOrder.Symbol, + Price: price, + Quantity: num, + NewClientOrderId: takeOrder.OrderSn, + } + + if err := spotApi.OrderPlaceLoop(db, params, 3); err != nil { + log.Errorf("现货止盈挂单失败 id:%s err:%v", takeOrder.Id, err) + } else { + if err := db.Model(&DbModels.LinePreOrder{}). + Where("id = ? ", takeOrder.Id). + Updates(map[string]interface{}{"trigger_time": time.Now()}).Error; err != nil { + log.Errorf("现货止盈更新状态失败 id:%s err:%v", takeOrder.Id, err) + } + } + + if _, err := helper.DefaultRedis.LRem(key, item); err != nil { + log.Errorf("现货止盈 删除缓存失败 id:%v err:%v", takeOrder.Id, err) + } + } else { + log.Error("获取锁失败") + } +} diff --git a/services/binanceservice/spotreset.go b/services/binanceservice/spotreset.go index b867787..5031a9c 100644 --- a/services/binanceservice/spotreset.go +++ b/services/binanceservice/spotreset.go @@ -785,9 +785,6 @@ func processTakeProfitAndStopLossOrders(db *gorm.DB, preOrder *models.LinePreOrd totalNum := num.Mul(decimal.NewFromFloat(0.998)).Truncate(int32(tradeSet.AmountDigit)) num = totalNum - // if orderExt.TakeProfitNumRatio.Cmp(decimal.Zero) > 0 && orderExt.TakeProfitNumRatio.Cmp(decimal.NewFromInt(100)) < 0 { - // num = num.Mul(orderExt.TakeProfitNumRatio.Div(decimal.NewFromInt(100))).Truncate(int32(tradeSet.AmountDigit)) - // } //止盈止损 for _, order := range orders { order.Num = num.String() @@ -828,8 +825,6 @@ func processTakeProfitAndStopLossOrders(db *gorm.DB, preOrder *models.LinePreOrd } processStopLossOrder(order) - // case 4: //减仓 - // processSpotReduceOrder(order) } } @@ -908,7 +903,7 @@ func processTakeProfitOrder(db *gorm.DB, spotApi SpotRestApi, order models.LineP if err != nil { logger.Error("现货止盈下单失败:", order.OrderSn, " err:", err) - if err := db.Model(&DbModels.LinePreOrder{}).Where("id = ?", order.Id). + if err = db.Model(&DbModels.LinePreOrder{}).Where("id = ?", order.Id). Updates(map[string]interface{}{"status": "2", "desc": err.Error(), "num": params.Quantity, "rate": order.Rate, "price": order.Price}).Error; err != nil { logger.Error("现货止盈下单失败,更新状态失败:", order.OrderSn, " err:", err) } @@ -924,8 +919,47 @@ func processTakeProfitOrder(db *gorm.DB, spotApi SpotRestApi, order models.LineP } } +// 止盈 放到止盈缓存中 +// Deprecated 暂时没用 +func processTakeProfitOrderNew(db *gorm.DB, spotApi SpotRestApi, order models.LinePreOrder) error { + tradeSet, _ := cacheservice.GetTradeSet(global.EXCHANGE_BINANCE, order.Symbol, 1) + + if tradeSet.Coin == "" { + logger.Error("获取交易对失败") + return errors.New("获取交易对失败") + } + + price, _ := decimal.NewFromString(order.Price) + price = price.Truncate(int32(tradeSet.PriceDigit)) + takeOrder := dto.TakeProfitRedisList{ + Id: order.Id, + MainId: order.MainId, + PId: order.Pid, + ApiId: order.ApiId, + OrderTye: order.OrderType, + OrderCategory: order.OrderCategory, + Price: price, + SymbolType: order.SymbolType, + Symbol: order.Symbol, + Site: order.Site, + OrderSn: order.OrderSn, + } + + takeVal, _ := sonic.MarshalString(&takeOrder) + takeKey := fmt.Sprintf(rediskey.SpotTakeList, global.EXCHANGE_BINANCE) + + if takeVal == "" { + return errors.New("takeVal is empty") + } + + if err := helper.DefaultRedis.RPushList(takeKey, takeVal); err != nil { + return err + } + + return nil +} + // 处理止损订单 -// order 止损单 func processStopLossOrder(order models.LinePreOrder) error { price := utility.StrToDecimal(order.Price) stopLoss := dto.StopLossRedisList{ @@ -955,6 +989,52 @@ func processStopLossOrder(order models.LinePreOrder) error { return nil } +// 处理止损订单 直接下到交易所 +// Deprecated: 现在不用,等止盈不挂到交易所得时候使用 +func processStopLossOrderNew(db *gorm.DB, spotApi SpotRestApi, order models.LinePreOrder) error { + tradeSet, _ := cacheservice.GetTradeSet(global.EXCHANGE_BINANCE, order.Symbol, 1) + + if tradeSet.Coin == "" { + logger.Error("获取交易对失败") + return errors.New("获取交易对失败") + } + + price, _ := decimal.NewFromString(order.Price) + + params := OrderPlacementService{ + ApiId: order.ApiId, + Symbol: order.Symbol, + Side: order.Site, + Price: price.Truncate(int32(tradeSet.PriceDigit)), + Quantity: utility.StrToDecimal(order.Num), + Type: "STOP_LOSS_LIMIT", + TimeInForce: "GTC", + StopPrice: price.Truncate(int32(tradeSet.PriceDigit)), + NewClientOrderId: order.OrderSn, + } + + err := spotApi.OrderPlaceLoop(db, params, 3) + + if err != nil { + logger.Error("现货止损下单失败:", order.OrderSn, " err:", err) + if err = db.Model(&DbModels.LinePreOrder{}).Where("id = ?", order.Id). + Updates(map[string]interface{}{"status": "2", "desc": err.Error(), "num": params.Quantity, "rate": order.Rate, "price": order.Price}).Error; err != nil { + logger.Error("现货止损下单失败,更新状态失败:", order.OrderSn, " err:", err) + } + } else { + if err := db.Model(&DbModels.LinePreOrder{}).Where("id =? ", order.Id). + Updates(map[string]interface{}{"trigger_time": time.Now(), "rate": order.Rate, "price": order.Price, "num": order.Num}).Error; err != nil { + logger.Error("更新现货止损单触发事件 ordersn:", order.OrderSn) + } + if err := db.Model(&DbModels.LinePreOrder{}).Where("id = ? and status ='0'", order.Id). + Updates(map[string]interface{}{"status": "1"}).Error; err != nil { + logger.Error("现货止损下单成功,更新状态失败:", order.OrderSn, " err:", err) + } + } + + return nil +} + // 循环取消订单 func CancelOpenOrderByOrderSnLoop(apiInfo DbModels.LineApiUser, symbol string, orderSn string) error { spotApi := SpotRestApi{} diff --git a/services/excservice/binancesocketmanager.go b/services/excservice/binancesocketmanager.go index f3f6a15..a6357b4 100644 --- a/services/excservice/binancesocketmanager.go +++ b/services/excservice/binancesocketmanager.go @@ -11,8 +11,6 @@ import ( "go-admin/pkg/jsonhelper" "go-admin/pkg/utility" "go-admin/services/proxy" - "math/rand" - "strconv" "strings" "sync" "sync/atomic" @@ -35,7 +33,7 @@ type BinanceWebSocketManager struct { proxyType string proxyAddress string reconnect chan struct{} - isStopped bool + isStopped atomic.Bool cancelFunc context.CancelFunc listenKey string reconnecting atomic.Bool @@ -44,8 +42,6 @@ type BinanceWebSocketManager struct { forceReconnectTimer *time.Timer } -// ... (省略中间代码) - // sendPing 使用锁来确保发送ping时的线程安全 func (wm *BinanceWebSocketManager) sendPing() error { wm.mu.Lock() @@ -54,18 +50,19 @@ func (wm *BinanceWebSocketManager) sendPing() error { return errors.New("websocket connection is nil") } wm.lastPingTime.Store(time.Now()) - return wm.ws.WriteMessage(websocket.PingMessage, []byte("ping")) + // 使用控制帧发送 Ping,避免与普通消息写入竞争 + return wm.ws.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(10*time.Second)) } // Stop 优雅地停止WebSocket管理器 func (wm *BinanceWebSocketManager) Stop() { wm.stopOnce.Do(func() { wm.mu.Lock() - if wm.isStopped { + if wm.isStopped.Load() { wm.mu.Unlock() return } - wm.isStopped = true + wm.isStopped.Store(true) wm.mu.Unlock() if wm.cancelFunc != nil { @@ -92,9 +89,11 @@ func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) { for { select { case <-wm.reconnect: - if !wm.reconnecting.CompareAndSwap(false, true) { - continue // 如果已经在重连,则忽略 - } + // 当收到重连信号时,不再依赖 CompareAndSwap 直接跳过的逻辑。 + // 之前的实现中,triggerReconnect 已将 reconnecting 标记为 true, + // 导致这里 CompareAndSwap 失败而 continue,从而丢失首次重连信号。 + // 修复:收到信号后直接置位为重连状态并继续执行重连流程,保证首次重连一定发生。 + wm.reconnecting.Store(true) // 在开始重连循环之前,先安全地关闭旧的连接 wm.closeConn() @@ -108,10 +107,11 @@ func (wm *BinanceWebSocketManager) handleReconnect(ctx context.Context) { default: } - log.Infof("WebSocket (%s) 正在尝试重连,第 %d 次...", wm.wsType, retryCount+1) + log.Infof("WebSocket (%d) 正在尝试重连,第 %d 次...", wm.wsType, retryCount+1) if err := wm.connect(ctx); err == nil { - log.Infof("WebSocket (%s) 重连成功", wm.wsType) + log.Infof("WebSocket (%d) 重连成功", wm.wsType) wm.reconnecting.Store(false) + setLastTime(wm) go wm.startDeadCheck(ctx) // 重连成功后,重新启动假死检测 break // 跳出重连循环 } @@ -163,7 +163,6 @@ func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyA wm := &BinanceWebSocketManager{ stopChannel: make(chan struct{}, 10), reconnect: make(chan struct{}, 10), - isStopped: false, url: url, wsType: wsType, apiKey: apiKey, @@ -174,6 +173,7 @@ func NewBinanceWebSocketManager(wsType int, apiKey, apiSecret, proxyType, proxyA // 初始化最后ping时间 wm.lastPingTime.Store(time.Now()) + wm.isStopped.Store(false) return wm } @@ -210,8 +210,8 @@ func (wm *BinanceWebSocketManager) Restart(apiKey, apiSecret, proxyType, proxyAd wm.proxyType = proxyType wm.proxyAddress = proxyAddress - if wm.isStopped { - wm.isStopped = false + if wm.isStopped.Load() { + wm.isStopped.Store(false) utility.SafeGo(wm.run) } else { log.Warnf("调用restart") @@ -265,6 +265,10 @@ func (wm *BinanceWebSocketManager) run() { errMessage := commondto.WebSocketErr{Time: time.Now()} helper.DefaultRedis.SetString(errKey, jsonhelper.ToJsonString(errMessage)) + // 在主循环前统一启动重连与假死检测,避免重复启动 + utility.SafeGo(func() { wm.handleReconnect(ctx) }) + utility.SafeGo(func() { wm.startDeadCheck(ctx) }) + for { select { case <-ctx.Done(): @@ -274,7 +278,7 @@ func (wm *BinanceWebSocketManager) run() { wm.handleConnectionError(errKey, err) if wm.isErrorCountExceeded(errKey) { - log.Error("连接 %s WebSocket 时出错次数过多,停止 WebSocket 管理器: %v", wm.wsType, wm.apiKey) + log.Errorf("连接 %s WebSocket 时出错次数过多,停止 WebSocket 管理器: %v", wm.wsType, wm.apiKey) wm.Stop() return } @@ -284,7 +288,7 @@ func (wm *BinanceWebSocketManager) run() { } <-wm.stopChannel - log.Info("停止 %s WebSocket 管理器...", getWsTypeName(wm.wsType)) + log.Infof("停止 %s WebSocket 管理器...", getWsTypeName(wm.wsType)) wm.Stop() return } @@ -315,7 +319,7 @@ func (wm *BinanceWebSocketManager) handleConnectionError(errKey string, err erro } // 记录错误日志 - log.Error("连接 %s WebSocket 时出错: %v, 错误: %v", wm.wsType, wm.apiKey, err) + log.Errorf("连接 %s WebSocket 时出错: %v, 错误: %v", wm.wsType, wm.apiKey, err) } /** @@ -363,34 +367,51 @@ func (wm *BinanceWebSocketManager) connect(ctx context.Context) error { // 连接成功,更新连接时间和ping时间 wm.ConnectTime = time.Now() wm.lastPingTime.Store(time.Now()) - log.Info(fmt.Sprintf("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey)) + setLastTime(wm) + log.Infof("已连接到 Binance %s WebSocket【%s】 key:%s", getWsTypeName(wm.wsType), wm.apiKey, listenKey) + + // 设置读超时 + var readTimeout time.Duration + if wm.wsType == 0 { + readTimeout = 2 * time.Minute + } else { + readTimeout = 4 * time.Minute + } + _ = wm.ws.SetReadDeadline(time.Now().Add(readTimeout)) // Ping处理 wm.ws.SetPingHandler(func(appData string) error { - log.Info(fmt.Sprintf("收到 wstype: %v key:%s Ping 消息【%s】", wm.wsType, wm.apiKey, appData)) + log.Infof("收到 wstype: %d key:%s Ping 消息【%s】", wm.wsType, wm.apiKey, appData) for x := 0; x < 5; x++ { if err := wm.ws.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second*10)); err != nil { - log.Error("binance 回应pong失败 次数:", strconv.Itoa(x), " err:", err) + log.Errorf("binance 回应pong失败 次数:%d err:%v", x, err) time.Sleep(time.Second * 1) continue } break } - // 更新ping时间和Redis记录 + // 更新ping时间和Redis记录,并刷新读超时 wm.lastPingTime.Store(time.Now()) setLastTime(wm) + _ = wm.ws.SetReadDeadline(time.Now().Add(readTimeout)) return nil }) - // 启动各种协程 + // Pong处理:收到服务端Pong时刷新心跳与读超时 + wm.ws.SetPongHandler(func(appData string) error { + wm.lastPingTime.Store(time.Now()) + setLastTime(wm) + _ = wm.ws.SetReadDeadline(time.Now().Add(readTimeout)) + return nil + }) + + // 启动必要协程(避免在此处重复启动重连与假死检测) utility.SafeGo(func() { wm.startListenKeyRenewal2(ctx) }) utility.SafeGo(func() { wm.readMessages(ctx) }) - utility.SafeGo(func() { wm.handleReconnect(ctx) }) utility.SafeGo(func() { wm.startPingLoop(ctx) }) - utility.SafeGo(func() { wm.startDeadCheck(ctx) }) // 连接成功后立即启动假死检测 - utility.SafeGo(func() { wm.start24HourReconnectTimer(ctx) }) // 启动24小时强制重连 + utility.SafeGo(func() { wm.start24HourReconnectTimer(ctx) }) // 启动23小时强制重连 return nil } @@ -398,7 +419,7 @@ func (wm *BinanceWebSocketManager) connect(ctx context.Context) error { // ReplaceConnection 创建新连接并关闭旧连接,实现无缝连接替换 func (wm *BinanceWebSocketManager) ReplaceConnection() error { wm.mu.Lock() - if wm.isStopped { + if wm.isStopped.Load() { wm.mu.Unlock() return errors.New("WebSocket 已停止") } @@ -430,7 +451,7 @@ func (wm *BinanceWebSocketManager) ReplaceConnection() error { // 设置 ping handler newConn.SetPingHandler(func(appData string) error { - log.Infof("收到 Ping(新连接) key:%s msg:%s", wm.apiKey, appData) + // log.Infof("收到 Ping(新连接) key:%s msg:%s", wm.apiKey, appData) for x := 0; x < 5; x++ { if err := newConn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(10*time.Second)); err != nil { log.Errorf("Pong 失败 %d 次 err:%v", x, err) @@ -456,7 +477,6 @@ func (wm *BinanceWebSocketManager) ReplaceConnection() error { // 步骤 4:启动新连接协程 go wm.startListenKeyRenewal2(newCtx) go wm.readMessages(newCtx) - go wm.handleReconnect(newCtx) go wm.startPingLoop(newCtx) // go wm.startDeadCheck(newCtx) @@ -529,8 +549,12 @@ func (wm *BinanceWebSocketManager) getListenKey() (string, error) { return "", err } - if listenKey, ok := dataMap["listenKey"]; ok { - return listenKey.(string), nil + if v, ok := dataMap["listenKey"]; ok { + s, ok2 := v.(string) + if !ok2 || s == "" { + return "", errors.New("listenKey 类型错误或为空") + } + return s, nil } return "", errors.New("listenKey 不存在") @@ -544,24 +568,24 @@ func (wm *BinanceWebSocketManager) readMessages(ctx context.Context) { case <-ctx.Done(): return default: - if wm.isStopped { + if wm.isStopped.Load() { return } _, msg, err := wm.ws.ReadMessage() if err != nil { // 检查是否是由于我们主动关闭连接导致的错误 - if wm.isStopped { + if wm.isStopped.Load() { log.Infof("WebSocket read loop gracefully stopped for key: %s", wm.apiKey) return } // 如果不是主动关闭,再判断是否是连接关闭错误,并触发重连 if strings.Contains(err.Error(), "websocket: close") { - log.Error("WebSocket connection closed unexpectedly, triggering reconnect for key: %s. Error: %v", wm.apiKey, err) + log.Errorf("WebSocket connection closed unexpectedly, triggering reconnect for key: %s. Error: %v", wm.apiKey, err) wm.triggerReconnect(false) } else { - log.Error("Error reading message for key: %s. Error: %v", wm.apiKey, err) + log.Errorf("Error reading message for key: %s. Error: %v", wm.apiKey, err) } return // 任何错误发生后都应退出读取循环 } @@ -611,11 +635,11 @@ func (wm *BinanceWebSocketManager) startPingLoop(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - if wm.isStopped { + if wm.isStopped.Load() { return } if err := wm.sendPing(); err != nil { - log.Error("主动 Ping Binance 失败:", err) + log.Errorf("主动 Ping Binance 失败: %v", err) wm.triggerReconnect(false) return // 退出循环,等待重连 } @@ -623,24 +647,6 @@ func (wm *BinanceWebSocketManager) startPingLoop(ctx context.Context) { } } -// 定期删除listenkey 并重启ws -// func (wm *BinanceWebSocketManager) startListenKeyRenewal(ctx context.Context, listenKey string) { -// time.Sleep(30 * time.Minute) - -// select { -// case <-ctx.Done(): -// return -// default: -// if err := wm.deleteListenKey(listenKey); err != nil { -// log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err) -// } else { -// log.Debug("Successfully delete listenKey") -// wm.triggerReconnect() -// } -// } - -// } - // 定时续期 func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) { ticker := time.NewTicker(30 * time.Minute) @@ -652,12 +658,12 @@ func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) { for { select { case <-ticker.C: - if wm.isStopped { + if wm.isStopped.Load() { return } if err := wm.renewListenKey(wm.listenKey); err != nil { - log.Error("Failed to renew listenKey: ,type:%v key: %s", wm.wsType, wm.apiKey, err) + log.Errorf("Failed to renew listenKey: ,type:%v key: %s err:%v", wm.wsType, wm.apiKey, err) } case <-ctx.Done(): return @@ -665,36 +671,6 @@ func (wm *BinanceWebSocketManager) startListenKeyRenewal2(ctx context.Context) { } } -/* -删除listenkey -*/ -// func (wm *BinanceWebSocketManager) deleteListenKey(listenKey string) error { -// client, err := wm.createBinanceClient() -// if err != nil { -// return err -// } - -// var resp []byte - -// switch wm.wsType { -// case 0: -// path := fmt.Sprintf("/api/v3/userDataStream") -// params := map[string]interface{}{ -// "listenKey": listenKey, -// } -// resp, _, err = client.SendSpotRequestByKey(path, "DELETE", params) - -// log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) -// case 1: -// resp, _, err = client.SendFuturesRequestByKey("/fapi/v1/listenKey", "DELETE", nil) -// log.Debug(fmt.Sprintf("deleteListenKey resp: %s", string(resp))) -// default: -// return errors.New("unknown ws type") -// } - -// return err -// } - func (wm *BinanceWebSocketManager) renewListenKey(listenKey string) error { client, err := wm.createBinanceClient() if err != nil { @@ -728,19 +704,6 @@ func getWsTypeName(wsType int) string { return "unknown" } } -func getUUID() string { - return fmt.Sprintf("%s-%s-%s-%s-%s", randomHex(8), randomHex(4), randomHex(4), randomHex(4), randomHex(12)) -} - -func randomHex(n int) string { - rand.New(rand.NewSource(time.Now().UnixNano())) - hexChars := "0123456789abcdef" - bytes := make([]byte, n) - for i := 0; i < n; i++ { - bytes[i] = hexChars[rand.Intn(len(hexChars))] - } - return string(bytes) -} /** * 启动24小时强制重连定时器 @@ -748,8 +711,9 @@ func randomHex(n int) string { */ func (wm *BinanceWebSocketManager) start24HourReconnectTimer(ctx context.Context) { // Binance要求不到24小时就需要主动断开重连 - // 设置为23小时50分钟,留出一些缓冲时间 - duration := 23*time.Hour + 50*time.Minute + // 设置为23小时,留出一些缓冲时间 + // duration := 23 * time.Hour + duration := 10 * time.Minute wm.forceReconnectTimer = time.NewTimer(duration) defer func() { @@ -762,8 +726,8 @@ func (wm *BinanceWebSocketManager) start24HourReconnectTimer(ctx context.Context case <-ctx.Done(): return case <-wm.forceReconnectTimer.C: - if !wm.isStopped { - log.Warnf("24小时强制重连触发 key:%s wsType:%v", wm.apiKey, wm.wsType) + if !wm.isStopped.Load() { + log.Warnf("23小时强制重连触发 key:%s wsType:%v", wm.apiKey, wm.wsType) wm.triggerReconnect(true) } } @@ -783,7 +747,7 @@ func (wm *BinanceWebSocketManager) startDeadCheck(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - if wm.isStopped { + if wm.isStopped.Load() { return } wm.DeadCheck() @@ -845,8 +809,3 @@ func (wm *BinanceWebSocketManager) DeadCheck() { wm.triggerReconnect(true) } } - -/** - * 优化的重连处理逻辑 - * @param ctx 上下文 - */ diff --git a/services/excservice/binancews.go b/services/excservice/binancews.go index c5f46be..a46cc72 100644 --- a/services/excservice/binancews.go +++ b/services/excservice/binancews.go @@ -97,26 +97,31 @@ func (bnWs *BinanceWs) Subscribe(streamName string, tradeSet models.TradeSet, ca } func (bnWs *BinanceWs) exitHandler(c *WsConn) { - pingTicker := time.NewTicker(1 * time.Minute) - pongTicker := time.NewTicker(30 * time.Second) - defer func() { - pingTicker.Stop() - pongTicker.Stop() - c.CloseWs() + // exitHandler 维护对底层连接的 Ping/Pong 心跳发送 + // 修复点:监听 c.close 通道,当连接关闭时退出循环,防止 goroutine 长期驻留导致内存泄漏 + pingTicker := time.NewTicker(1 * time.Minute) + pongTicker := time.NewTicker(30 * time.Second) + defer func() { + pingTicker.Stop() + pongTicker.Stop() + c.CloseWs() if err := recover(); err != nil { fmt.Printf("CloseWs, panic: %s\r\n", err) } }() - for { - select { - case t := <-pingTicker.C: - c.SendPingMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond))))) - case t := <-pongTicker.C: - c.SendPongMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond))))) - } - } + for { + select { + case <-c.close: + // 连接关闭,退出心跳协程 + return + case t := <-pingTicker.C: + c.SendPingMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond))))) + case t := <-pongTicker.C: + c.SendPongMessage([]byte(strconv.Itoa(int(t.UnixNano() / int64(time.Millisecond))))) + } + } } func parseJsonToMap(msg []byte) (map[string]interface{}, error) { diff --git a/services/excservice/websocket.go b/services/excservice/websocket.go index 2601bdb..31df371 100644 --- a/services/excservice/websocket.go +++ b/services/excservice/websocket.go @@ -262,11 +262,13 @@ func (ws *WsConn) reconnect() { } } +// writeRequest 负责写出文本/控制帧以及心跳发送 +// 修复点:在函数退出时停止 heartTimer,避免底层定时器资源长时间存留 func (ws *WsConn) writeRequest() { - var ( - heartTimer *time.Timer - err error - ) + var ( + heartTimer *time.Timer + err error + ) if ws.HeartbeatIntervalTime == 0 { heartTimer = time.NewTimer(time.Hour) @@ -274,31 +276,37 @@ func (ws *WsConn) writeRequest() { heartTimer = time.NewTimer(ws.HeartbeatIntervalTime) } - for { - select { - case <-ws.close: - log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting write message goroutine.") - return - case d := <-ws.writeBufferChan: - err = ws.c.WriteMessage(websocket.TextMessage, d) - case d := <-ws.pingMessageBufferChan: - err = ws.c.WriteMessage(websocket.PingMessage, d) - case d := <-ws.pongMessageBufferChan: - err = ws.c.WriteMessage(websocket.PongMessage, d) - case d := <-ws.closeMessageBufferChan: - err = ws.c.WriteMessage(websocket.CloseMessage, d) - case <-heartTimer.C: - if ws.HeartbeatIntervalTime > 0 { - err = ws.c.WriteMessage(websocket.TextMessage, ws.HeartbeatData()) - heartTimer.Reset(ws.HeartbeatIntervalTime) - } - } + defer func() { + if heartTimer != nil { + heartTimer.Stop() + } + }() - if err != nil { - log.Info("[ws][" + ws.WsUrl + "] write message " + err.Error()) - //time.Sleep(time.Second) - } - } + for { + select { + case <-ws.close: + log.Info("[ws][" + ws.WsUrl + "] close websocket , exiting write message goroutine.") + return + case d := <-ws.writeBufferChan: + err = ws.c.WriteMessage(websocket.TextMessage, d) + case d := <-ws.pingMessageBufferChan: + err = ws.c.WriteMessage(websocket.PingMessage, d) + case d := <-ws.pongMessageBufferChan: + err = ws.c.WriteMessage(websocket.PongMessage, d) + case d := <-ws.closeMessageBufferChan: + err = ws.c.WriteMessage(websocket.CloseMessage, d) + case <-heartTimer.C: + if ws.HeartbeatIntervalTime > 0 { + err = ws.c.WriteMessage(websocket.TextMessage, ws.HeartbeatData()) + heartTimer.Reset(ws.HeartbeatIntervalTime) + } + } + + if err != nil { + log.Info("[ws][" + ws.WsUrl + "] write message " + err.Error()) + //time.Sleep(time.Second) + } + } } func (ws *WsConn) Subscribe(subEvent interface{}) error { diff --git a/services/orderservice/order_service.go b/services/orderservice/order_service.go index 9ef35e6..d734b27 100644 --- a/services/orderservice/order_service.go +++ b/services/orderservice/order_service.go @@ -3,11 +3,13 @@ package orderservice import ( "fmt" "go-admin/app/admin/models" - "go-admin/app/admin/service/dto" + "go-admin/common/const/rediskey" + "go-admin/common/helper" "go-admin/pkg/utility" "go-admin/pkg/utility/snowflakehelper" "time" + "github.com/go-admin-team/go-admin-core/logger" "github.com/go-admin-team/go-admin-core/sdk/service" "github.com/jinzhu/copier" "github.com/shopspring/decimal" @@ -85,17 +87,24 @@ func (e *OrderService) CreateReduceReduceOrder(db *gorm.DB, pid int, price, num } // 触发主单失败 -func (e *OrderService) ErrorTrigger(preOrder *dto.PreOrderRedisList, errData string) error { - // var cacheOrderKey string +// preOrderId 主单id +// preOrderItem 触发单缓存内容 +func (e *OrderService) ErrorTrigger(preOrderId, symbolType int, exchangeType, preOrderItem string, errData string) error { + var cacheOrderKey string - // if preOrder.OrderType == 1 { - // cacheOrderKey = fmt.Sprintf(rediskey.PreSpotOrderList, preOrder.ExchangeType) - // } else { - // cacheOrderKey = fmt.Sprintf(rediskey.PreFutOrderList, preOrder.ExchangeType) - // } + if symbolType == 1 { + cacheOrderKey = fmt.Sprintf(rediskey.PreSpotOrderList, exchangeType) + } else { + cacheOrderKey = fmt.Sprintf(rediskey.PreFutOrderList, exchangeType) + } + + if _, err := helper.DefaultRedis.LRem(cacheOrderKey, preOrderItem); err != nil { + logger.Errorf("删除缓存减仓单失败:key:%s item:%s err:%v", cacheOrderKey, preOrderItem, err) + return err + } if err := e.Orm.Model(&models.LinePreOrder{}). - Where("id = ? and status =1", preOrder.Id). + Where("id = ? and status =0", preOrderId). Updates(map[string]interface{}{ "status": 2, "desc": errData,