2025-02-06 11:14:33 +08:00
package jobs
import (
2025-02-18 15:40:45 +08:00
"context"
2025-02-06 11:14:33 +08:00
"errors"
2025-02-10 17:55:34 +08:00
"fmt"
2025-02-19 09:41:05 +08:00
"github.com/bytedance/sonic"
2025-02-06 11:14:33 +08:00
"go-admin/app/admin/models"
"go-admin/app/admin/service"
"go-admin/app/admin/service/dto"
2025-02-18 15:40:45 +08:00
"go-admin/common/const/rediskey"
2025-02-10 17:55:34 +08:00
"go-admin/common/global"
2025-02-18 15:40:45 +08:00
"go-admin/common/helper"
2025-02-19 10:38:29 +08:00
"go-admin/pkg/emailhelper"
2025-02-10 17:55:34 +08:00
"go-admin/pkg/utility"
"go-admin/pkg/utility/snowflakehelper"
"go-admin/services/binanceservice"
2025-02-06 11:14:33 +08:00
"go-admin/services/fileservice"
2025-02-19 10:38:29 +08:00
"gorm.io/driver/mysql"
2025-02-19 09:41:05 +08:00
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"slices"
2025-02-06 11:14:33 +08:00
"strconv"
"strings"
"time"
2025-02-14 10:54:30 +08:00
"github.com/shopspring/decimal"
2025-02-06 11:14:33 +08:00
"github.com/go-admin-team/go-admin-core/sdk"
"gorm.io/gorm"
"github.com/go-admin-team/go-admin-core/logger"
)
type InitFuturesSymbol struct {
}
type InitSpotSymbol struct {
}
type ClearLogJob struct {
}
type AutoPlaceOrder struct {
}
2025-02-11 09:39:15 +08:00
type LimitOrderTimeoutDuration struct {
2025-02-10 17:55:34 +08:00
}
2025-02-19 09:41:05 +08:00
type ListenSymbol struct {
}
2025-02-06 11:14:33 +08:00
// 初始化合约交易对
func ( t InitFuturesSymbol ) Exec ( arg interface { } ) error {
str := time . Now ( ) . Format ( timeFormat ) + " [INFO] JobCore InitFuturesSymbol exec success"
defer logger . Info ( str )
dbs := sdk . Runtime . GetDb ( )
var db * gorm . DB
for _ , item := range dbs {
db = item
break
}
symbolService := service . LineSymbol { }
symbolService . Orm = db
//重置合约交易对
symbolService . ResetFuturesSymbol ( )
return nil
}
// 初始化现货交易对
func ( t InitSpotSymbol ) Exec ( arg interface { } ) error {
str := time . Now ( ) . Format ( timeFormat ) + " [INFO] JobCore InitSpotSymbol exec success"
defer logger . Info ( str )
dbs := sdk . Runtime . GetDb ( )
var db * gorm . DB
for _ , item := range dbs {
db = item
break
}
symbolService := service . LineSymbol { }
symbolService . Orm = db
//重置现货交易对
symbolService . ResetSpotSymbol ( )
return nil
}
// 删除过期日志
func ( t ClearLogJob ) Exec ( arg interface { } ) error {
str := time . Now ( ) . Format ( timeFormat ) + " [INFO] JobCore ClearLogJob exec success"
defer logger . Info ( str )
var db * gorm . DB
for _ , item := range sdk . Runtime . GetDb ( ) {
db = item
break
}
fileservice . ClearLogs ( db )
return nil
}
// Exec 执行自动下单
func ( t AutoPlaceOrder ) Exec ( arg interface { } ) error {
str := time . Now ( ) . Format ( timeFormat ) + " [INFO] JobCore ClearLogJob exec success"
defer logger . Info ( str )
var db * gorm . DB
for _ , item := range sdk . Runtime . GetDb ( ) {
db = item
break
}
var templateLogs [ ] models . LineOrderTemplateLogs
db . Model ( & models . LineOrderTemplateLogs { } ) . Where ( "switch = '1'" ) . Find ( & templateLogs )
if len ( templateLogs ) > 0 {
ids := make ( [ ] string , len ( templateLogs ) )
for _ , log := range templateLogs {
ids = append ( ids , strconv . Itoa ( log . Id ) )
}
req := dto . QuickAddPreOrderReq { Ids : strings . Join ( ids , "," ) }
preOrderService := service . LinePreOrder { }
preOrderService . Orm = db
errs := make ( [ ] error , 0 )
errStr := make ( [ ] string , 0 )
err := preOrderService . QuickAddPreOrder ( & req , nil , & errs )
if err != nil {
return err
}
if len ( errs ) > 0 {
//e.Logger.Error(err)
for _ , err2 := range errs {
errStr = append ( errStr , err2 . Error ( ) )
}
return errors . New ( strings . Join ( errStr , "," ) )
}
}
return nil
}
2025-02-10 17:55:34 +08:00
2025-02-11 09:39:15 +08:00
func ( t LimitOrderTimeoutDuration ) Exec ( arg interface { } ) error {
2025-02-10 17:55:34 +08:00
str := time . Now ( ) . Format ( timeFormat ) + " [INFO] JobCore ClearLogJob exec success"
defer logger . Info ( str )
var db * gorm . DB
for _ , item := range sdk . Runtime . GetDb ( ) {
db = item
break
}
var req = new ( dto . SysConfigByKeyReq )
var resp = new ( dto . GetSysConfigByKEYForServiceResp )
req . ConfigKey = "limitOrderTimeoutDuration"
serviceConfig := service . SysConfig { }
serviceConfig . Orm = db
serviceConfig . GetWithKey ( req , resp )
if resp . ConfigValue == "" {
return nil
}
2025-02-18 15:40:45 +08:00
lock := helper . NewRedisLock ( rediskey . JobReOrderTrigger , 20 , 5 , 100 * time . Millisecond )
if ok , err := lock . AcquireWait ( context . Background ( ) ) ; err != nil {
logger . Error ( "获取锁失败" , err )
return nil
} else if ok {
defer lock . Release ( )
limitOrderTimeoutDuration := utility . StringAsInt64 ( resp . ConfigValue )
orders := make ( [ ] models . LinePreOrder , 0 )
err := db . Model ( & models . LinePreOrder { } ) . Where ( "status = '5' AND main_order_type = 'LIMIT' AND order_type in ('4') AND updated_at < ?" , time . Now ( ) . Add ( - time . Duration ( limitOrderTimeoutDuration ) * time . Second ) ) . Find ( & orders ) . Error
if err != nil {
return err
}
spotApi := binanceservice . SpotRestApi { }
futApi := binanceservice . FutRestApi { }
for _ , order := range orders {
var apiUserinfo models . LineApiUser
db . Model ( & models . LineApiUser { } ) . Where ( "id = ?" , order . ApiId ) . Find ( & apiUserinfo )
//现货
if order . SymbolType == global . SYMBOL_SPOT {
if order . ExchangeType == global . EXCHANGE_BINANCE {
err := t . ReSpotOrderPlace ( db , order , apiUserinfo , spotApi )
if err != nil {
continue
}
2025-02-10 17:55:34 +08:00
}
}
2025-02-18 15:40:45 +08:00
//合约
if order . SymbolType == global . SYMBOL_FUTURES {
if order . ExchangeType == global . EXCHANGE_BINANCE {
err := t . ReFutOrderPlace ( db , order , apiUserinfo , futApi )
if err != nil {
continue
}
2025-02-11 09:39:15 +08:00
}
}
2025-02-18 15:40:45 +08:00
}
2025-02-11 09:39:15 +08:00
}
return nil
}
2025-02-10 17:55:34 +08:00
2025-02-11 09:39:15 +08:00
// ReSpotOrderPlace 重下现货市价单
func ( t LimitOrderTimeoutDuration ) ReSpotOrderPlace ( db * gorm . DB , order models . LinePreOrder , apiUserinfo models . LineApiUser , spotApi binanceservice . SpotRestApi ) error {
var err error
for i := 0 ; i < 3 ; i ++ {
2025-02-18 15:40:45 +08:00
err = spotApi . CancelOpenOrderByOrderSn ( apiUserinfo , order . Symbol , order . OrderSn )
2025-02-11 09:39:15 +08:00
if err == nil || strings . Contains ( err . Error ( ) , "该交易对没有订单" ) {
break
}
}
if err != nil {
logger . Error ( fmt . Sprintf ( "取消现货委托失败:order_sn:%s err:%+v" , order . OrderSn , err ) )
return err
} else {
newClientOrderId := snowflakehelper . GetOrderId ( )
2025-02-18 15:40:45 +08:00
order . Desc = fmt . Sprintf ( "取消限价单,重下市价单源订单号:%s " , order . OrderSn )
order . OrderSn = utility . Int64ToString ( snowflakehelper . GetOrderId ( ) )
order . MainOrderType = "MARKET"
// var newOrder models.LinePreOrder
// copier.Copy(&newOrder, order)
// newOrder.Id = 0
// newOrder.OrderSn = utility.Int64ToString(newClientOrderId)
// newOrder.CreatedAt = time.Now()
// newOrder.MainOrderType = "MARKET"
// err = db.Model(&models.LinePreOrder{}).Create(&newOrder).Error
err := db . Model ( & order ) . Updates ( map [ string ] interface { } { "desc" : order . Desc , "order_sn" : order . OrderSn , "main_order_type" : order . MainOrderType } ) . Error
2025-02-11 09:39:15 +08:00
if err != nil {
logger . Error ( fmt . Sprintf ( "生成新市价单失败 err:%+v" , err ) )
return err
}
params := binanceservice . OrderPlacementService {
ApiId : order . ApiId ,
Symbol : order . Symbol ,
Side : order . Site ,
Type : "MARKET" ,
TimeInForce : "GTC" ,
Price : utility . StringToDecimal ( order . Price ) ,
2025-02-18 15:40:45 +08:00
StopPrice : utility . StrToDecimal ( order . Price ) ,
2025-02-11 09:39:15 +08:00
Quantity : utility . StringToDecimal ( order . Num ) ,
NewClientOrderId : utility . Int64ToString ( newClientOrderId ) ,
2025-02-10 17:55:34 +08:00
}
2025-02-11 09:39:15 +08:00
if err := spotApi . OrderPlace ( db , params ) ; err != nil {
logger . Error ( fmt . Sprintf ( "重新下市价单失败 err:%+v" , err ) )
2025-02-18 15:40:45 +08:00
err := db . Model ( & order ) . Updates ( map [ string ] interface { } { "status" : "2" , "desc" : order . Desc + "err:" + err . Error ( ) } ) . Error
2025-02-10 17:55:34 +08:00
2025-02-11 09:39:15 +08:00
if err != nil {
logger . Error ( "下单失败后修改订单失败" )
return err
}
}
2025-02-10 17:55:34 +08:00
}
2025-02-11 09:39:15 +08:00
2025-02-10 17:55:34 +08:00
return nil
}
2025-02-11 09:39:15 +08:00
// ReFutOrderPlace 重下合约市价单
func ( t LimitOrderTimeoutDuration ) ReFutOrderPlace ( db * gorm . DB , order models . LinePreOrder , apiUserinfo models . LineApiUser , futApi binanceservice . FutRestApi ) error {
var err error
for i := 0 ; i < 3 ; i ++ {
err := futApi . CancelFutOrder ( apiUserinfo , order . Symbol , order . OrderSn )
if err == nil || strings . Contains ( err . Error ( ) , "该交易对没有订单" ) {
break
}
}
if err != nil {
logger . Error ( fmt . Sprintf ( "取消现货委托失败:order_sn:%s err:%+v" , order . OrderSn , err ) )
return err
} else {
newClientOrderId := snowflakehelper . GetOrderId ( )
2025-02-18 15:40:45 +08:00
orderType := "MARKET"
order . Desc = fmt . Sprintf ( "取消限价单,重下市价单 源订单号:%s" , order . OrderSn )
order . OrderSn = utility . Int64ToString ( newClientOrderId )
// var newOrder models.LinePreOrder
// copier.Copy(&newOrder, order)
// newOrder.Id = 0
// newOrder.OrderSn = utility.Int64ToString(newClientOrderId)
// newOrder.CreatedAt = time.Now()
// newOrder.MainOrderType = "MARKET"
// err = db.Model(&models.LinePreOrder{}).Create(&newOrder).Error
err = db . Model ( & order ) . Updates ( map [ string ] interface { } { "desc" : order . Desc , "order_sn" : order . OrderSn } ) . Error
2025-02-11 09:39:15 +08:00
if err != nil {
logger . Error ( fmt . Sprintf ( "生成合约新市价单失败 err:%+v" , err ) )
return err
}
2025-02-18 15:40:45 +08:00
if order . OrderType == 4 {
orderType = "STOP_MARKET"
}
2025-02-11 09:39:15 +08:00
params := binanceservice . FutOrderPlace {
ApiId : order . ApiId ,
Symbol : order . Symbol ,
Side : order . Site ,
Quantity : utility . StringToDecimal ( order . Num ) ,
Price : utility . StringToDecimal ( order . Price ) ,
SideType : "MARKET" ,
OpenOrder : 0 ,
Profit : decimal . Decimal { } ,
2025-02-18 15:40:45 +08:00
StopPrice : utility . StringToDecimal ( order . Price ) ,
OrderType : orderType ,
2025-02-11 09:39:15 +08:00
NewClientOrderId : utility . Int64ToString ( newClientOrderId ) ,
}
if err := futApi . OrderPlace ( db , params ) ; err != nil {
logger . Error ( fmt . Sprintf ( "重新下合约市价单失败 err:%+v" , err ) )
2025-02-18 15:40:45 +08:00
err := db . Model ( & order ) . Updates ( map [ string ] interface { } { "status" : "2" , "desc" : order . Desc + " err:" + err . Error ( ) } ) . Error
2025-02-11 09:39:15 +08:00
if err != nil {
logger . Error ( "下单失败后修改订单失败" )
return err
}
}
}
return err
}
2025-02-19 09:41:05 +08:00
func ( l ListenSymbol ) Exec ( arg interface { } ) error {
str := time . Now ( ) . Format ( timeFormat ) + " [INFO] JobCore ClearLogJob exec success"
defer logger . Info ( str )
var db * gorm . DB
for _ , item := range sdk . Runtime . GetDb ( ) {
db = item
break
}
2025-02-19 10:38:29 +08:00
if db == nil {
dsn := "root:root@tcp(192.168.1.12:3306)/go_exchange_single?charset=utf8mb4&parseTime=True&loc=Local&timeout=1000ms"
db , _ = gorm . Open ( mysql . Open ( dsn ) , & gorm . Config { } )
}
var req = new ( dto . SysConfigByKeyReq )
var configResp = new ( dto . GetSysConfigByKEYForServiceResp )
req . ConfigKey = "EmailAddress"
serviceConfig := service . SysConfig { }
serviceConfig . Orm = db
serviceConfig . GetWithKey ( req , configResp )
if configResp . ConfigValue == "" {
return nil
}
symbols := make ( [ ] models . SpiderListenSymbol , 0 )
err := db . Model ( & models . SpiderListenSymbol { } ) . Find ( & symbols ) . Error
2025-02-19 09:41:05 +08:00
if err != nil {
logger . Error ( "获取监听交易对失败" )
return err
}
2025-02-19 10:38:29 +08:00
var set [ ] string
for _ , symbol := range symbols {
set = append ( set , symbol . Coin + symbol . Currency )
}
if len ( set ) == 0 {
return nil
}
2025-02-19 09:41:05 +08:00
aveToken , _ := helper . DefaultRedis . GetString ( rediskey . AveRequestToken )
if aveToken == "" {
aveToken , err = l . GetAveRequestId ( )
if err != nil {
logger . Error ( "获取请求id失败err:" , err )
return err
}
}
chainSlice := [ ] string { "solana" , "bsc" , "eth" }
for _ , chain := range chainSlice {
symbols , err := l . GetAveLastSymbol ( chain , aveToken )
if err != nil {
continue
}
for _ , symbol := range symbols . Data . Data {
if slices . Contains ( set , symbol . Token0Symbol + symbol . Token1Symbol ) {
//发送邮箱
2025-02-19 10:38:29 +08:00
emails := strings . Split ( configResp . ConfigValue , "," )
for _ , email := range emails {
2025-02-19 11:21:40 +08:00
emailhelper . SendNoticeEmail ( email , chain , symbol . Token0Symbol + "/" + symbol . Token1Symbol , symbol . TargetToken )
2025-02-19 10:38:29 +08:00
}
// 清除数据
db . Model ( & models . SpiderListenSymbol { } ) . Where ( "coin = ? AND currency = ?" , symbol . Token0Symbol , symbol . Token1Symbol ) . Delete ( & models . SpiderListenSymbol { } )
2025-02-19 09:41:05 +08:00
continue
}
}
}
return nil
}
// GetAveRequestId 获取请求id
func ( l ListenSymbol ) GetAveRequestId ( ) ( aveToken string , err error ) {
execPath , err := os . Getwd ( )
// 获取可执行文件所在的目录
execDir := fmt . Sprintf ( "%s/config/ave.js" , filepath . Dir ( filepath . Dir ( execPath ) ) )
_ , err = os . Stat ( execDir )
if err != nil {
logger . Error ( "可执行的js 文件不存在" )
return aveToken , err
}
command := exec . Command ( "node" , execDir )
output , err := command . CombinedOutput ( )
if err != nil {
logger . Error ( "执行出错:err" , err )
return aveToken , err
}
client := & http . Client { }
m := map [ string ] string {
"request_id" : string ( output ) ,
}
marshal , _ := sonic . Marshal ( m )
var data = strings . NewReader ( string ( marshal ) )
req , err := http . NewRequest ( "POST" , "https://febweb002.com/v1api/v1/captcha/requestToken" , data )
if err != nil {
return aveToken , err
}
req . Header . Set ( "accept" , "*/*" )
req . Header . Set ( "accept-language" , "zh-CN,zh;q=0.9" )
req . Header . Set ( "ave-platform" , "web" )
req . Header . Set ( "content-type" , "application/json" )
req . Header . Set ( "origin" , "https://ave.ai" )
req . Header . Set ( "priority" , "u=1, i" )
req . Header . Set ( "referer" , "https://ave.ai/" )
req . Header . Set ( "sec-ch-ua" , ` "Not A(Brand";v="8", "Chromium";v="132", "Google Chrome";v="132" ` )
req . Header . Set ( "sec-ch-ua-mobile" , "?0" )
req . Header . Set ( "sec-ch-ua-platform" , ` "Windows" ` )
req . Header . Set ( "sec-fetch-dest" , "empty" )
req . Header . Set ( "sec-fetch-mode" , "cors" )
req . Header . Set ( "sec-fetch-site" , "cross-site" )
req . Header . Set ( "user-agent" , "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36" )
resp , err := client . Do ( req )
if err != nil {
logger . Error ( "请求失败err:" , err )
return aveToken , err
}
defer resp . Body . Close ( )
bodyText , err := io . ReadAll ( resp . Body )
if err != nil {
logger . Error ( "读取响应失败err:" , err )
return aveToken , err
}
var res RequestId
sonic . Unmarshal ( bodyText , & res )
if res . Status == 1 {
aveToken = res . Data . Id
helper . DefaultRedis . SetStringExpire ( rediskey . AveRequestToken , res . Data . Id , time . Duration ( 60 * time . Second ) )
}
return aveToken , err
}
func ( l ListenSymbol ) GetAveLastSymbol ( chain , token string ) ( aveLastSymbolResp AveLastSymbolResp , err error ) {
client := & http . Client { }
url := fmt . Sprintf ( "https://api.agacve.com/v1api/v4/tokens/treasure/list?chain=%s&sort=created_at&sort_dir=desc&pageNO=1&pageSize=50&category=new&refresh_total=0" , chain )
req , err := http . NewRequest ( "GET" , url , nil )
if err != nil {
logger . Error ( "获取最新交易对列表出错err:" , err )
return AveLastSymbolResp { } , err
}
req . Header . Set ( "accept" , "application/json, text/plain, */*" )
req . Header . Set ( "accept-language" , "zh-CN,zh;q=0.9" )
req . Header . Set ( "ave-udid" , "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36--1739784205001--97086a46-bce0-4ca9-8e0e-78f5075be306" )
req . Header . Set ( "cache-control" , "no-cache" )
req . Header . Set ( "lang" , "en" )
req . Header . Set ( "origin" , "https://ave.ai" )
req . Header . Set ( "pragma" , "no-cache" )
req . Header . Set ( "priority" , "u=1, i" )
req . Header . Set ( "referer" , "https://ave.ai/" )
req . Header . Set ( "sec-ch-ua" , ` "Not A(Brand";v="8", "Chromium";v="132", "Google Chrome";v="132" ` )
req . Header . Set ( "sec-ch-ua-mobile" , "?0" )
req . Header . Set ( "sec-ch-ua-platform" , ` "Windows" ` )
req . Header . Set ( "sec-fetch-dest" , "empty" )
req . Header . Set ( "sec-fetch-mode" , "cors" )
req . Header . Set ( "sec-fetch-site" , "cross-site" )
req . Header . Set ( "user-agent" , "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36" )
req . Header . Set ( "x-auth" , token )
resp , err := client . Do ( req )
if err != nil {
logger . Error ( "获取最新交易对列表出错1err:" , err )
return AveLastSymbolResp { } , err
}
defer resp . Body . Close ( )
bodyText , err := io . ReadAll ( resp . Body )
if err != nil {
logger . Error ( "获取最新交易对列表出错2err:" , err )
return AveLastSymbolResp { } , err
}
var res AveLastSymbolResp
sonic . Unmarshal ( bodyText , & res )
return res , nil
}