package jobs import ( "context" "errors" "fmt" "github.com/bytedance/sonic" "go-admin/app/admin/models" "go-admin/app/admin/service" "go-admin/app/admin/service/dto" "go-admin/common/const/rediskey" "go-admin/common/global" "go-admin/common/helper" "go-admin/pkg/utility" "go-admin/pkg/utility/snowflakehelper" "go-admin/services/binanceservice" "go-admin/services/fileservice" "io" "net/http" "os" "os/exec" "path/filepath" "slices" "strconv" "strings" "time" "github.com/shopspring/decimal" "github.com/go-admin-team/go-admin-core/sdk" "gorm.io/gorm" "github.com/go-admin-team/go-admin-core/logger" ) type InitFuturesSymbol struct { } type InitSpotSymbol struct { } type ClearLogJob struct { } type AutoPlaceOrder struct { } type LimitOrderTimeoutDuration struct { } type ListenSymbol struct { } // 初始化合约交易对 func (t InitFuturesSymbol) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore InitFuturesSymbol exec success" defer logger.Info(str) dbs := sdk.Runtime.GetDb() var db *gorm.DB for _, item := range dbs { db = item break } symbolService := service.LineSymbol{} symbolService.Orm = db //重置合约交易对 symbolService.ResetFuturesSymbol() return nil } // 初始化现货交易对 func (t InitSpotSymbol) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore InitSpotSymbol exec success" defer logger.Info(str) dbs := sdk.Runtime.GetDb() var db *gorm.DB for _, item := range dbs { db = item break } symbolService := service.LineSymbol{} symbolService.Orm = db //重置现货交易对 symbolService.ResetSpotSymbol() return nil } // 删除过期日志 func (t ClearLogJob) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success" defer logger.Info(str) var db *gorm.DB for _, item := range sdk.Runtime.GetDb() { db = item break } fileservice.ClearLogs(db) return nil } // Exec 执行自动下单 func (t AutoPlaceOrder) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success" defer logger.Info(str) var db *gorm.DB for _, item := range sdk.Runtime.GetDb() { db = item break } var templateLogs []models.LineOrderTemplateLogs db.Model(&models.LineOrderTemplateLogs{}).Where("switch = '1'").Find(&templateLogs) if len(templateLogs) > 0 { ids := make([]string, len(templateLogs)) for _, log := range templateLogs { ids = append(ids, strconv.Itoa(log.Id)) } req := dto.QuickAddPreOrderReq{Ids: strings.Join(ids, ",")} preOrderService := service.LinePreOrder{} preOrderService.Orm = db errs := make([]error, 0) errStr := make([]string, 0) err := preOrderService.QuickAddPreOrder(&req, nil, &errs) if err != nil { return err } if len(errs) > 0 { //e.Logger.Error(err) for _, err2 := range errs { errStr = append(errStr, err2.Error()) } return errors.New(strings.Join(errStr, ",")) } } return nil } func (t LimitOrderTimeoutDuration) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success" defer logger.Info(str) var db *gorm.DB for _, item := range sdk.Runtime.GetDb() { db = item break } var req = new(dto.SysConfigByKeyReq) var resp = new(dto.GetSysConfigByKEYForServiceResp) req.ConfigKey = "limitOrderTimeoutDuration" serviceConfig := service.SysConfig{} serviceConfig.Orm = db serviceConfig.GetWithKey(req, resp) if resp.ConfigValue == "" { return nil } lock := helper.NewRedisLock(rediskey.JobReOrderTrigger, 20, 5, 100*time.Millisecond) if ok, err := lock.AcquireWait(context.Background()); err != nil { logger.Error("获取锁失败", err) return nil } else if ok { defer lock.Release() limitOrderTimeoutDuration := utility.StringAsInt64(resp.ConfigValue) orders := make([]models.LinePreOrder, 0) err := db.Model(&models.LinePreOrder{}).Where("status = '5' AND main_order_type = 'LIMIT' AND order_type in ('4') AND updated_at < ?", time.Now().Add(-time.Duration(limitOrderTimeoutDuration)*time.Second)).Find(&orders).Error if err != nil { return err } spotApi := binanceservice.SpotRestApi{} futApi := binanceservice.FutRestApi{} for _, order := range orders { var apiUserinfo models.LineApiUser db.Model(&models.LineApiUser{}).Where("id = ?", order.ApiId).Find(&apiUserinfo) //现货 if order.SymbolType == global.SYMBOL_SPOT { if order.ExchangeType == global.EXCHANGE_BINANCE { err := t.ReSpotOrderPlace(db, order, apiUserinfo, spotApi) if err != nil { continue } } } //合约 if order.SymbolType == global.SYMBOL_FUTURES { if order.ExchangeType == global.EXCHANGE_BINANCE { err := t.ReFutOrderPlace(db, order, apiUserinfo, futApi) if err != nil { continue } } } } } return nil } // ReSpotOrderPlace 重下现货市价单 func (t LimitOrderTimeoutDuration) ReSpotOrderPlace(db *gorm.DB, order models.LinePreOrder, apiUserinfo models.LineApiUser, spotApi binanceservice.SpotRestApi) error { var err error for i := 0; i < 3; i++ { err = spotApi.CancelOpenOrderByOrderSn(apiUserinfo, order.Symbol, order.OrderSn) if err == nil || strings.Contains(err.Error(), "该交易对没有订单") { break } } if err != nil { logger.Error(fmt.Sprintf("取消现货委托失败:order_sn:%s err:%+v", order.OrderSn, err)) return err } else { newClientOrderId := snowflakehelper.GetOrderId() order.Desc = fmt.Sprintf("取消限价单,重下市价单源订单号:%s ", order.OrderSn) order.OrderSn = utility.Int64ToString(snowflakehelper.GetOrderId()) order.MainOrderType = "MARKET" // var newOrder models.LinePreOrder // copier.Copy(&newOrder, order) // newOrder.Id = 0 // newOrder.OrderSn = utility.Int64ToString(newClientOrderId) // newOrder.CreatedAt = time.Now() // newOrder.MainOrderType = "MARKET" // err = db.Model(&models.LinePreOrder{}).Create(&newOrder).Error err := db.Model(&order).Updates(map[string]interface{}{"desc": order.Desc, "order_sn": order.OrderSn, "main_order_type": order.MainOrderType}).Error if err != nil { logger.Error(fmt.Sprintf("生成新市价单失败 err:%+v", err)) return err } params := binanceservice.OrderPlacementService{ ApiId: order.ApiId, Symbol: order.Symbol, Side: order.Site, Type: "MARKET", TimeInForce: "GTC", Price: utility.StringToDecimal(order.Price), StopPrice: utility.StrToDecimal(order.Price), Quantity: utility.StringToDecimal(order.Num), NewClientOrderId: utility.Int64ToString(newClientOrderId), } if err := spotApi.OrderPlace(db, params); err != nil { logger.Error(fmt.Sprintf("重新下市价单失败 err:%+v", err)) err := db.Model(&order).Updates(map[string]interface{}{"status": "2", "desc": order.Desc + "err:" + err.Error()}).Error if err != nil { logger.Error("下单失败后修改订单失败") return err } } } return nil } // ReFutOrderPlace 重下合约市价单 func (t LimitOrderTimeoutDuration) ReFutOrderPlace(db *gorm.DB, order models.LinePreOrder, apiUserinfo models.LineApiUser, futApi binanceservice.FutRestApi) error { var err error for i := 0; i < 3; i++ { err := futApi.CancelFutOrder(apiUserinfo, order.Symbol, order.OrderSn) if err == nil || strings.Contains(err.Error(), "该交易对没有订单") { break } } if err != nil { logger.Error(fmt.Sprintf("取消现货委托失败:order_sn:%s err:%+v", order.OrderSn, err)) return err } else { newClientOrderId := snowflakehelper.GetOrderId() orderType := "MARKET" order.Desc = fmt.Sprintf("取消限价单,重下市价单 源订单号:%s", order.OrderSn) order.OrderSn = utility.Int64ToString(newClientOrderId) // var newOrder models.LinePreOrder // copier.Copy(&newOrder, order) // newOrder.Id = 0 // newOrder.OrderSn = utility.Int64ToString(newClientOrderId) // newOrder.CreatedAt = time.Now() // newOrder.MainOrderType = "MARKET" // err = db.Model(&models.LinePreOrder{}).Create(&newOrder).Error err = db.Model(&order).Updates(map[string]interface{}{"desc": order.Desc, "order_sn": order.OrderSn}).Error if err != nil { logger.Error(fmt.Sprintf("生成合约新市价单失败 err:%+v", err)) return err } if order.OrderType == 4 { orderType = "STOP_MARKET" } params := binanceservice.FutOrderPlace{ ApiId: order.ApiId, Symbol: order.Symbol, Side: order.Site, Quantity: utility.StringToDecimal(order.Num), Price: utility.StringToDecimal(order.Price), SideType: "MARKET", OpenOrder: 0, Profit: decimal.Decimal{}, StopPrice: utility.StringToDecimal(order.Price), OrderType: orderType, NewClientOrderId: utility.Int64ToString(newClientOrderId), } if err := futApi.OrderPlace(db, params); err != nil { logger.Error(fmt.Sprintf("重新下合约市价单失败 err:%+v", err)) err := db.Model(&order).Updates(map[string]interface{}{"status": "2", "desc": order.Desc + " err:" + err.Error()}).Error if err != nil { logger.Error("下单失败后修改订单失败") return err } } } return err } func (l ListenSymbol) Exec(arg interface{}) error { str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success" defer logger.Info(str) var db *gorm.DB for _, item := range sdk.Runtime.GetDb() { db = item break } set, err := helper.DefaultRedis.GetAllSortSet(rediskey.ListenAveLastSymbol) if err != nil { logger.Error("获取监听交易对失败") return err } fmt.Println(set) //if len(set) == 0 { // return nil //} aveToken, _ := helper.DefaultRedis.GetString(rediskey.AveRequestToken) if aveToken == "" { aveToken, err = l.GetAveRequestId() if err != nil { logger.Error("获取请求id失败err:", err) return err } } chainSlice := []string{"solana", "bsc", "eth"} for _, chain := range chainSlice { symbols, err := l.GetAveLastSymbol(chain, aveToken) if err != nil { continue } for _, symbol := range symbols.Data.Data { if slices.Contains(set, symbol.Token0Symbol+symbol.Token1Symbol) { // 清除数据 //发送邮箱 l.SendEmailNotice(db, symbol.Pair, symbol.Token0Symbol+symbol.Token1Symbol, chain) continue } } } return nil } // GetAveRequestId 获取请求id func (l ListenSymbol) GetAveRequestId() (aveToken string, err error) { execPath, err := os.Getwd() // 获取可执行文件所在的目录 execDir := fmt.Sprintf("%s/config/ave.js", filepath.Dir(filepath.Dir(execPath))) _, err = os.Stat(execDir) if err != nil { logger.Error("可执行的js 文件不存在") return aveToken, err } command := exec.Command("node", execDir) output, err := command.CombinedOutput() if err != nil { logger.Error("执行出错:err", err) return aveToken, err } client := &http.Client{} m := map[string]string{ "request_id": string(output), } marshal, _ := sonic.Marshal(m) var data = strings.NewReader(string(marshal)) req, err := http.NewRequest("POST", "https://febweb002.com/v1api/v1/captcha/requestToken", data) if err != nil { return aveToken, err } req.Header.Set("accept", "*/*") req.Header.Set("accept-language", "zh-CN,zh;q=0.9") req.Header.Set("ave-platform", "web") req.Header.Set("content-type", "application/json") req.Header.Set("origin", "https://ave.ai") req.Header.Set("priority", "u=1, i") req.Header.Set("referer", "https://ave.ai/") req.Header.Set("sec-ch-ua", `"Not A(Brand";v="8", "Chromium";v="132", "Google Chrome";v="132"`) req.Header.Set("sec-ch-ua-mobile", "?0") req.Header.Set("sec-ch-ua-platform", `"Windows"`) req.Header.Set("sec-fetch-dest", "empty") req.Header.Set("sec-fetch-mode", "cors") req.Header.Set("sec-fetch-site", "cross-site") req.Header.Set("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36") resp, err := client.Do(req) if err != nil { logger.Error("请求失败err:", err) return aveToken, err } defer resp.Body.Close() bodyText, err := io.ReadAll(resp.Body) if err != nil { logger.Error("读取响应失败err:", err) return aveToken, err } var res RequestId sonic.Unmarshal(bodyText, &res) if res.Status == 1 { aveToken = res.Data.Id helper.DefaultRedis.SetStringExpire(rediskey.AveRequestToken, res.Data.Id, time.Duration(60*time.Second)) } return aveToken, err } func (l ListenSymbol) GetAveLastSymbol(chain, token string) (aveLastSymbolResp AveLastSymbolResp, err error) { client := &http.Client{} url := fmt.Sprintf("https://api.agacve.com/v1api/v4/tokens/treasure/list?chain=%s&sort=created_at&sort_dir=desc&pageNO=1&pageSize=50&category=new&refresh_total=0", chain) req, err := http.NewRequest("GET", url, nil) if err != nil { logger.Error("获取最新交易对列表出错err:", err) return AveLastSymbolResp{}, err } req.Header.Set("accept", "application/json, text/plain, */*") req.Header.Set("accept-language", "zh-CN,zh;q=0.9") req.Header.Set("ave-udid", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36--1739784205001--97086a46-bce0-4ca9-8e0e-78f5075be306") req.Header.Set("cache-control", "no-cache") req.Header.Set("lang", "en") req.Header.Set("origin", "https://ave.ai") req.Header.Set("pragma", "no-cache") req.Header.Set("priority", "u=1, i") req.Header.Set("referer", "https://ave.ai/") req.Header.Set("sec-ch-ua", `"Not A(Brand";v="8", "Chromium";v="132", "Google Chrome";v="132"`) req.Header.Set("sec-ch-ua-mobile", "?0") req.Header.Set("sec-ch-ua-platform", `"Windows"`) req.Header.Set("sec-fetch-dest", "empty") req.Header.Set("sec-fetch-mode", "cors") req.Header.Set("sec-fetch-site", "cross-site") req.Header.Set("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36") req.Header.Set("x-auth", token) resp, err := client.Do(req) if err != nil { logger.Error("获取最新交易对列表出错1err:", err) return AveLastSymbolResp{}, err } defer resp.Body.Close() bodyText, err := io.ReadAll(resp.Body) if err != nil { logger.Error("获取最新交易对列表出错2err:", err) return AveLastSymbolResp{}, err } var res AveLastSymbolResp sonic.Unmarshal(bodyText, &res) return res, nil } // SendEmailNotice 发送邮箱通知 // pair 合约地址 // symbol 完整交易对 // chain 链 func (l ListenSymbol) SendEmailNotice(db *gorm.DB, pair string, symbol string, chain string) { }