package jobs import ( "context" "errors" "fmt" "go-admin/app/admin/models" "go-admin/app/admin/service" "go-admin/app/admin/service/appservice" "go-admin/app/admin/service/dto" "go-admin/common/const/rediskey" "go-admin/common/global" "go-admin/common/helper" "go-admin/pkg/emailhelper" "go-admin/pkg/utility" "go-admin/pkg/utility/snowflakehelper" "go-admin/services/binanceservice" "go-admin/services/cacheservice" "go-admin/services/fileservice" "io" "net/http" "os" "os/exec" "slices" "strconv" "strings" "time" "github.com/bytedance/sonic" "github.com/jinzhu/copier" "github.com/shopspring/decimal" "gorm.io/driver/mysql" "github.com/go-admin-team/go-admin-core/sdk" "gorm.io/gorm" "github.com/go-admin-team/go-admin-core/logger" ) type InitFuturesSymbol struct { } type InitSpotSymbol struct { } type ClearLogJob struct { } type AutoPlaceOrder struct { } type LimitOrderTimeoutDuration struct { } type ListenSymbol struct { } // 会员过期任务 type MemberExpirationJob struct{} // 初始化合约交易对 func (t InitFuturesSymbol) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore InitFuturesSymbol exec success" defer logger.Info(str) db := getDefaultDb() symbolService := service.LineSymbol{} symbolService.Orm = db //重置合约交易对 symbolService.ResetFuturesSymbol() return nil } func getDefaultDb() *gorm.DB { dbs := sdk.Runtime.GetDb() var db *gorm.DB for _, item := range dbs { db = item break } return db } // 初始化现货交易对 func (t InitSpotSymbol) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore InitSpotSymbol exec success" defer logger.Info(str) dbs := sdk.Runtime.GetDb() var db *gorm.DB for _, item := range dbs { db = item break } symbolService := service.LineSymbol{} symbolService.Orm = db //重置现货交易对 symbolService.ResetSpotSymbol() return nil } // 删除过期日志 func (t ClearLogJob) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success" defer logger.Info(str) var db *gorm.DB for _, item := range sdk.Runtime.GetDb() { db = item break } fileservice.ClearLogs(db) return nil } // Exec 执行自动下单 func (t AutoPlaceOrder) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success" defer logger.Info(str) var db *gorm.DB for _, item := range sdk.Runtime.GetDb() { db = item break } var templateLogs []models.LineOrderTemplateLogs db.Model(&models.LineOrderTemplateLogs{}).Where("switch = '1'").Find(&templateLogs) if len(templateLogs) > 0 { ids := make([]string, len(templateLogs)) for _, log := range templateLogs { ids = append(ids, strconv.Itoa(log.Id)) } req := dto.QuickAddPreOrderReq{Ids: strings.Join(ids, ",")} preOrderService := service.LinePreOrder{} preOrderService.Orm = db errs := make([]error, 0) errStr := make([]string, 0) err := preOrderService.QuickAddPreOrder(&req, nil, 0, &errs) if err != nil { return err } if len(errs) > 0 { //e.Logger.Error(err) for _, err2 := range errs { errStr = append(errStr, err2.Error()) } return errors.New(strings.Join(errStr, ",")) } } return nil } func (t LimitOrderTimeoutDuration) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success" defer logger.Info(str) var db *gorm.DB for _, item := range sdk.Runtime.GetDb() { db = item break } var req = new(dto.SysConfigByKeyReq) var resp = new(dto.GetSysConfigByKEYForServiceResp) req.ConfigKey = "limitOrderTimeoutDuration" serviceConfig := service.SysConfig{} serviceConfig.Orm = db serviceConfig.GetWithKey(req, resp) if resp.ConfigValue == "" { return nil } lock := helper.NewRedisLock(rediskey.JobReOrderTrigger, 20, 5, 100*time.Millisecond) if ok, err := lock.AcquireWait(context.Background()); err != nil { logger.Error("获取锁失败", err) return nil } else if ok { defer lock.Release() limitOrderTimeoutDuration := utility.StringAsInt64(resp.ConfigValue) orders := make([]models.LinePreOrder, 0) err := db.Model(&models.LinePreOrder{}). Where("status = '5' AND main_order_type = 'LIMIT' AND order_type in ('4') AND updated_at < ?", time.Now().Add(-time.Duration(limitOrderTimeoutDuration)*time.Second)). Preload("Childs"). Find(&orders).Error if err != nil { return err } spotApi := binanceservice.SpotRestApi{} futApi := binanceservice.FutRestApi{} for _, order := range orders { var apiUserinfo models.LineApiUser db.Model(&models.LineApiUser{}).Where("id = ?", order.ApiId).Find(&apiUserinfo) //现货 if order.SymbolType == global.SYMBOL_SPOT { if order.ExchangeType == global.EXCHANGE_BINANCE { err := t.ReSpotOrderPlace(db, order, apiUserinfo, spotApi) if err != nil { continue } } } //合约 if order.SymbolType == global.SYMBOL_FUTURES { if order.ExchangeType == global.EXCHANGE_BINANCE { err := t.ReFutOrderPlace(db, order, apiUserinfo, futApi) if err != nil { continue } } } } } return nil } // ReSpotOrderPlace 重下现货市价单 func (t LimitOrderTimeoutDuration) ReSpotOrderPlace(db *gorm.DB, order models.LinePreOrder, apiUserinfo models.LineApiUser, spotApi binanceservice.SpotRestApi) error { var err error for i := 0; i < 3; i++ { err = spotApi.CancelOpenOrderByOrderSn(apiUserinfo, order.Symbol, order.OrderSn) if err == nil || strings.Contains(err.Error(), "该交易对没有订单") { err = nil break } } if err != nil { logger.Error(fmt.Sprintf("取消现货委托失败:order_sn:%s err:%+v", order.OrderSn, err)) return err } else { var remainingQuantity decimal.Decimal spotOrder, err := spotApi.GetOrderByOrderSnLoop(order.Symbol, order.OrderSn, apiUserinfo, 4) tradeSet, _ := cacheservice.GetTradeSet(global.EXCHANGE_BINANCE, order.Symbol, 0) if err == nil { origQty := utility.StrToDecimal(spotOrder.OrigQty) excuteQty := utility.StrToDecimal(spotOrder.ExecutedQty) remainingQuantity = origQty.Sub(excuteQty).Abs().Truncate(int32(tradeSet.AmountDigit)) } if remainingQuantity.Cmp(decimal.Zero) <= 0 { logger.Errorf("剩余数量为0 无需重新下市价单mainid:%v", order.MainId) return nil } maxMarketQty := decimal.NewFromFloat(tradeSet.MarketMaxQty).Truncate(int32(tradeSet.AmountDigit)) // if config.ApplicationConfig.Mode == "dev" { // maxMarketQty = decimal.NewFromFloat(10) // } if remainingQuantity.Cmp(maxMarketQty) > 0 && maxMarketQty.Cmp(decimal.Zero) > 0 { multiple := remainingQuantity.Div(maxMarketQty).Abs().IntPart() remainder := remainingQuantity.Mod(maxMarketQty) saveOrders := make([]models.LinePreOrder, 0) desc := fmt.Sprintf("取消限价单,重下市价单 源订单号:%s", order.OrderSn) // 创建 multiple 个订单 for i := 0; i < int(multiple); i++ { saveOrders = append(saveOrders, createNewOrder(order, maxMarketQty, desc)) } // 处理余数 if remainder.Cmp(decimal.Zero) > 0 { saveOrders = append(saveOrders, createNewOrder(order, remainder.Truncate(int32(tradeSet.AmountDigit)), desc)) } if err := db.Create(&saveOrders).Error; err != nil { logger.Errorf("市价订单拆分后保存失败,err:", err) return err } for index := range saveOrders { newNum := utility.StrToDecimal(saveOrders[index].Num) if err := newSpotOrderClosePosition(saveOrders[index], newNum, spotApi, db); err != nil { logger.Errorf("市价订单拆分后保存失败,err:", err) } if index == len(saveOrders)-1 { orderCopy := saveOrders[index] // 复制数据 binanceservice.HandleSpotMarketSliceTakeProfit(db, orderCopy, order.Id, apiUserinfo, tradeSet) } } } else { newClientOrderId := snowflakehelper.GetOrderId() order.Num = remainingQuantity.Truncate(int32(tradeSet.AmountDigit)).String() order.Desc = fmt.Sprintf("取消限价单,重下市价单源订单号:%s ", order.OrderSn) order.OrderSn = utility.Int64ToString(newClientOrderId) order.MainOrderType = "MARKET" err = db.Model(&order).Updates(map[string]interface{}{"desc": order.Desc, "order_sn": order.OrderSn, "main_order_type": order.MainOrderType}).Error if err != nil { logger.Error(fmt.Sprintf("生成新市价单失败 err:%+v", err)) return err } if err := newSpotOrderClosePosition(order, remainingQuantity, spotApi, db); err != nil { return err } } } return nil } // 现货市价单平仓 func newSpotOrderClosePosition(order models.LinePreOrder, remainingQuantity decimal.Decimal, spotApi binanceservice.SpotRestApi, db *gorm.DB) error { params := binanceservice.OrderPlacementService{ ApiId: order.ApiId, Symbol: order.Symbol, Side: order.Site, Type: "MARKET", TimeInForce: "GTC", Price: utility.StringToDecimal(order.Price), StopPrice: utility.StrToDecimal(order.Price), Quantity: remainingQuantity, NewClientOrderId: order.OrderSn, } if err := spotApi.OrderPlaceLoop(db, params, 3); err != nil { logger.Error(fmt.Sprintf("重新下市价单失败 err:%+v", err)) err := db.Model(&order).Updates(map[string]interface{}{"status": "2", "desc": order.Desc + "err:" + err.Error()}).Error if err != nil { logger.Error("下单失败后修改订单失败") return err } } return nil } // ReFutOrderPlace 重下合约市价单 func (t LimitOrderTimeoutDuration) ReFutOrderPlace(db *gorm.DB, order models.LinePreOrder, apiUserinfo models.LineApiUser, futApi binanceservice.FutRestApi) error { var err error for i := 0; i < 3; i++ { err := futApi.CancelFutOrder(apiUserinfo, order.Symbol, order.OrderSn) if err == nil || strings.Contains(err.Error(), "该交易对没有订单") { err = nil break } } if err != nil { logger.Error(fmt.Sprintf("取消现货委托失败:order_sn:%s err:%+v", order.OrderSn, err)) return err } else { var remainingQuantity decimal.Decimal spotOrder, err := futApi.GetOrderByOrderSnLoop(order.Symbol, order.OrderSn, apiUserinfo, 4) tradeSet, _ := cacheservice.GetTradeSet(global.EXCHANGE_BINANCE, order.Symbol, 1) if err == nil { origQty := utility.StrToDecimal(spotOrder.OrigQty) excuteQty := utility.StrToDecimal(spotOrder.ExecutedQty) remainingQuantity = origQty.Sub(excuteQty).Abs().Truncate(int32(tradeSet.AmountDigit)) } if remainingQuantity.Cmp(decimal.Zero) <= 0 { logger.Errorf("剩余数量为0 无需重新下市价单mainid:%v", order.MainId) return nil } qty := decimal.NewFromFloat(tradeSet.MarketMaxQty) maxMarketQty := qty.Truncate(int32(tradeSet.AmountDigit)) // if config.ApplicationConfig.Mode == "dev" { // maxMarketQty = decimal.NewFromFloat(10) // } //数量超过最大数量,则拆单 if remainingQuantity.Cmp(maxMarketQty) > 0 && maxMarketQty.Cmp(decimal.Zero) > 0 { multiple := remainingQuantity.Div(maxMarketQty).Abs().IntPart() remainder := remainingQuantity.Mod(maxMarketQty) saveOrders := make([]models.LinePreOrder, 0) desc := fmt.Sprintf("取消限价单,重下市价单 源订单号:%s", order.OrderSn) // 创建 multiple 个订单 for i := 0; i < int(multiple); i++ { saveOrders = append(saveOrders, createNewOrder(order, maxMarketQty, desc)) } // 处理余数 if remainder.Cmp(decimal.Zero) > 0 { saveOrders = append(saveOrders, createNewOrder(order, remainder.Truncate(int32(tradeSet.AmountDigit)), desc)) } if err := db.Create(&saveOrders).Error; err != nil { logger.Errorf("市价订单拆分后保存失败,err:", err) return err } for index := range saveOrders { newNum := utility.StrToDecimal(saveOrders[index].Num) if err := newOrderClosePosition(saveOrders[index], futApi, newNum.Truncate(int32(tradeSet.AmountDigit)), apiUserinfo, db); err != nil { logger.Errorf("市价单拆分后下单失败 orderSn:%s, err:%s", saveOrders[index].OrderSn, err) } if index == len(saveOrders)-1 { orderCopy := saveOrders[index] // 复制数据 binanceservice.HandleMarketSliceTakeProfit(db, orderCopy, order.Id, apiUserinfo, tradeSet) } } } else { newClientOrderId := snowflakehelper.GetOrderId() order.Num = remainingQuantity.Truncate(int32(tradeSet.AmountDigit)).String() order.Desc = fmt.Sprintf("取消限价单,重下市价单 源订单号:%s", order.OrderSn) order.OrderSn = utility.Int64ToString(newClientOrderId) err = db.Model(&order).Updates(map[string]interface{}{"desc": order.Desc, "order_sn": order.OrderSn}).Error if err != nil { logger.Error(fmt.Sprintf("生成合约新市价单失败 err:%+v", err)) return err } err = newOrderClosePosition(order, futApi, remainingQuantity, apiUserinfo, db) if err != nil { return err } } } return err } // 复制订单 func createNewOrder(order models.LinePreOrder, num decimal.Decimal, desc string) models.LinePreOrder { newOrder := models.LinePreOrder{} copier.Copy(&newOrder, order) newOrder.Id = 0 newOrder.OrderSn = utility.Int64ToString(snowflakehelper.GetOrderId()) newOrder.Num = num.String() newOrder.Desc = desc newOrder.MainOrderType = "MARKET" newOrder.Status = 0 // 重新创建 Childs,并正确赋值 `Pid` var newChilds []models.LinePreOrder for _, child := range order.Childs { newChild := child newChild.Id = 0 newChild.Pid = 0 newChild.OrderSn = utility.Int64ToString(snowflakehelper.GetOrderId()) newChild.CreatedAt = time.Now() newChilds = append(newChilds, newChild) } newOrder.Childs = newChilds // 重新赋值子订单,避免浅拷贝问题 return newOrder } // 新市价单 func newOrderClosePosition(order models.LinePreOrder, futApi binanceservice.FutRestApi, remainingQuantity decimal.Decimal, apiUserinfo models.LineApiUser, db *gorm.DB) error { var positionSide string if order.Site == "BUY" { positionSide = "SHORT" } else { positionSide = "LONG" } if err := futApi.ClosePositionLoop(order.Symbol, order.OrderSn, remainingQuantity, order.Site, positionSide, apiUserinfo, "MARKET", "0", decimal.Zero, 3); err != nil { logger.Error(fmt.Sprintf("重新下合约市价单失败 err:%+v", err)) err := db.Model(&order).Updates(map[string]interface{}{"status": "2", "desc": order.Desc + " err:" + err.Error()}).Error if err != nil { logger.Error("下单失败后修改订单失败") return err } } return nil } func (l ListenSymbol) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success" defer logger.Info(str) var db *gorm.DB for _, item := range sdk.Runtime.GetDb() { db = item break } if db == nil { dsn := "root:root@tcp(192.168.1.12:3306)/go_exchange_single?charset=utf8mb4&parseTime=True&loc=Local&timeout=1000ms" db, _ = gorm.Open(mysql.Open(dsn), &gorm.Config{}) } var req = new(dto.SysConfigByKeyReq) var configResp = new(dto.GetSysConfigByKEYForServiceResp) req.ConfigKey = "EmailAddress" serviceConfig := service.SysConfig{} serviceConfig.Orm = db serviceConfig.GetWithKey(req, configResp) if configResp.ConfigValue == "" { return nil } symbols := make([]models.SpiderListenSymbol, 0) err := db.Model(&models.SpiderListenSymbol{}).Find(&symbols).Error if err != nil { logger.Error("获取监听交易对失败") return err } var set []string for _, symbol := range symbols { set = append(set, symbol.Coin+symbol.Currency) } if len(set) == 0 { return nil } aveToken, _ := helper.DefaultRedis.GetString(rediskey.AveRequestToken) if aveToken == "" { aveToken, err = l.GetAveRequestId() if err != nil { logger.Error("获取请求id失败err:", err) return err } } chainSlice := []string{"solana", "bsc", "eth"} for _, chain := range chainSlice { symbols, err := l.GetAveLastSymbol(chain, aveToken) if err != nil { continue } for _, symbol := range symbols.Data.Data { if slices.Contains(set, symbol.Token0Symbol+symbol.Token1Symbol) || slices.Contains(set, symbol.Token1Symbol+symbol.Token0Symbol) { //发送邮箱 emails := strings.Split(configResp.ConfigValue, ",") for _, email := range emails { if slices.Contains(set, symbol.Token0Symbol+symbol.Token1Symbol) { emailhelper.SendNoticeEmail(email, chain, symbol.Token0Symbol+"/"+symbol.Token1Symbol, symbol.TargetToken) db.Model(&models.SpiderListenSymbol{}).Where("coin = ? AND currency = ?", symbol.Token0Symbol, symbol.Token1Symbol).Delete(&models.SpiderListenSymbol{}) } else { emailhelper.SendNoticeEmail(email, chain, symbol.Token1Symbol+"/"+symbol.Token0Symbol, symbol.TargetToken) db.Model(&models.SpiderListenSymbol{}).Where("coin = ? AND currency = ?", symbol.Token1Symbol, symbol.Token0Symbol).Delete(&models.SpiderListenSymbol{}) } } continue } } } return nil } // GetAveRequestId 获取请求id func (l ListenSymbol) GetAveRequestId() (aveToken string, err error) { execPath, err := os.Getwd() // 获取可执行文件所在的目录 execDir := fmt.Sprintf("%s/config/ave.js", execPath) _, err = os.Stat(execDir) if err != nil { logger.Error("可执行的js 文件不存在") return aveToken, err } command := exec.Command("node", execDir) output, err := command.CombinedOutput() if err != nil { logger.Error("执行出错:err", err) return aveToken, err } client := &http.Client{} m := map[string]string{ "request_id": string(output), } marshal, _ := sonic.Marshal(m) var data = strings.NewReader(string(marshal)) req, err := http.NewRequest("POST", "https://febweb002.com/v1api/v1/captcha/requestToken", data) if err != nil { return aveToken, err } req.Header.Set("accept", "*/*") req.Header.Set("accept-language", "zh-CN,zh;q=0.9") req.Header.Set("ave-platform", "web") req.Header.Set("content-type", "application/json") req.Header.Set("origin", "https://ave.ai") req.Header.Set("priority", "u=1, i") req.Header.Set("referer", "https://ave.ai/") req.Header.Set("sec-ch-ua", `"Not A(Brand";v="8", "Chromium";v="132", "Google Chrome";v="132"`) req.Header.Set("sec-ch-ua-mobile", "?0") req.Header.Set("sec-ch-ua-platform", `"Windows"`) req.Header.Set("sec-fetch-dest", "empty") req.Header.Set("sec-fetch-mode", "cors") req.Header.Set("sec-fetch-site", "cross-site") req.Header.Set("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36") resp, err := client.Do(req) if err != nil { logger.Error("请求失败err:", err) return aveToken, err } defer resp.Body.Close() bodyText, err := io.ReadAll(resp.Body) if err != nil { logger.Error("读取响应失败err:", err) return aveToken, err } var res RequestId sonic.Unmarshal(bodyText, &res) if res.Status == 1 { aveToken = res.Data.Id helper.DefaultRedis.SetStringExpire(rediskey.AveRequestToken, res.Data.Id, time.Duration(60*time.Second)) } return aveToken, err } func (l ListenSymbol) GetAveLastSymbol(chain, token string) (aveLastSymbolResp AveLastSymbolResp, err error) { client := &http.Client{} url := fmt.Sprintf("https://api.agacve.com/v1api/v4/tokens/treasure/list?chain=%s&sort=created_at&sort_dir=desc&pageNO=1&pageSize=50&category=new&refresh_total=0", chain) req, err := http.NewRequest("GET", url, nil) if err != nil { logger.Error("获取最新交易对列表出错err:", err) return AveLastSymbolResp{}, err } req.Header.Set("accept", "application/json, text/plain, */*") req.Header.Set("accept-language", "zh-CN,zh;q=0.9") req.Header.Set("ave-udid", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36--1739784205001--97086a46-bce0-4ca9-8e0e-78f5075be306") req.Header.Set("cache-control", "no-cache") req.Header.Set("lang", "en") req.Header.Set("origin", "https://ave.ai") req.Header.Set("pragma", "no-cache") req.Header.Set("priority", "u=1, i") req.Header.Set("referer", "https://ave.ai/") req.Header.Set("sec-ch-ua", `"Not A(Brand";v="8", "Chromium";v="132", "Google Chrome";v="132"`) req.Header.Set("sec-ch-ua-mobile", "?0") req.Header.Set("sec-ch-ua-platform", `"Windows"`) req.Header.Set("sec-fetch-dest", "empty") req.Header.Set("sec-fetch-mode", "cors") req.Header.Set("sec-fetch-site", "cross-site") req.Header.Set("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36") req.Header.Set("x-auth", token) resp, err := client.Do(req) if err != nil { logger.Error("获取最新交易对列表出错1err:", err) return AveLastSymbolResp{}, err } defer resp.Body.Close() bodyText, err := io.ReadAll(resp.Body) if err != nil { logger.Error("获取最新交易对列表出错2err:", err) return AveLastSymbolResp{}, err } var res AveLastSymbolResp sonic.Unmarshal(bodyText, &res) return res, nil } // 会员过期任务 func (e MemberExpirationJob) Exec(arg interface{}) error { userAppService := appservice.LineUser{} userAppService.Orm = getDefaultDb() err := userAppService.Expire() if err != nil { logger.Error("会员过期任务出错err:", err) return err } return nil }