Files
exchange_go/app/jobs/jobs.go

322 lines
9.0 KiB
Go
Raw Normal View History

2025-02-06 11:14:33 +08:00
package jobs
import (
2025-02-18 15:40:45 +08:00
"context"
2025-02-06 11:14:33 +08:00
"errors"
2025-02-10 17:55:34 +08:00
"fmt"
2025-02-06 11:14:33 +08:00
"go-admin/app/admin/models"
"go-admin/app/admin/service"
"go-admin/app/admin/service/dto"
2025-02-18 15:40:45 +08:00
"go-admin/common/const/rediskey"
2025-02-10 17:55:34 +08:00
"go-admin/common/global"
2025-02-18 15:40:45 +08:00
"go-admin/common/helper"
2025-02-10 17:55:34 +08:00
"go-admin/pkg/utility"
"go-admin/pkg/utility/snowflakehelper"
"go-admin/services/binanceservice"
2025-02-06 11:14:33 +08:00
"go-admin/services/fileservice"
"strconv"
"strings"
"time"
2025-02-14 10:54:30 +08:00
"github.com/shopspring/decimal"
2025-02-06 11:14:33 +08:00
"github.com/go-admin-team/go-admin-core/sdk"
"gorm.io/gorm"
"github.com/go-admin-team/go-admin-core/logger"
)
type InitFuturesSymbol struct {
}
type InitSpotSymbol struct {
}
type ClearLogJob struct {
}
type AutoPlaceOrder struct {
}
2025-02-11 09:39:15 +08:00
type LimitOrderTimeoutDuration struct {
2025-02-10 17:55:34 +08:00
}
2025-02-06 11:14:33 +08:00
// 初始化合约交易对
func (t InitFuturesSymbol) Exec(arg interface{}) error {
str := time.Now().Format(timeFormat) + " [INFO] JobCore InitFuturesSymbol exec success"
defer logger.Info(str)
dbs := sdk.Runtime.GetDb()
var db *gorm.DB
for _, item := range dbs {
db = item
break
}
symbolService := service.LineSymbol{}
symbolService.Orm = db
//重置合约交易对
symbolService.ResetFuturesSymbol()
return nil
}
// 初始化现货交易对
func (t InitSpotSymbol) Exec(arg interface{}) error {
str := time.Now().Format(timeFormat) + " [INFO] JobCore InitSpotSymbol exec success"
defer logger.Info(str)
dbs := sdk.Runtime.GetDb()
var db *gorm.DB
for _, item := range dbs {
db = item
break
}
symbolService := service.LineSymbol{}
symbolService.Orm = db
//重置现货交易对
symbolService.ResetSpotSymbol()
return nil
}
// 删除过期日志
func (t ClearLogJob) Exec(arg interface{}) error {
str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success"
defer logger.Info(str)
var db *gorm.DB
for _, item := range sdk.Runtime.GetDb() {
db = item
break
}
fileservice.ClearLogs(db)
return nil
}
// Exec 执行自动下单
func (t AutoPlaceOrder) Exec(arg interface{}) error {
str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success"
defer logger.Info(str)
var db *gorm.DB
for _, item := range sdk.Runtime.GetDb() {
db = item
break
}
var templateLogs []models.LineOrderTemplateLogs
db.Model(&models.LineOrderTemplateLogs{}).Where("switch = '1'").Find(&templateLogs)
if len(templateLogs) > 0 {
ids := make([]string, len(templateLogs))
for _, log := range templateLogs {
ids = append(ids, strconv.Itoa(log.Id))
}
req := dto.QuickAddPreOrderReq{Ids: strings.Join(ids, ",")}
preOrderService := service.LinePreOrder{}
preOrderService.Orm = db
errs := make([]error, 0)
errStr := make([]string, 0)
err := preOrderService.QuickAddPreOrder(&req, nil, &errs)
if err != nil {
return err
}
if len(errs) > 0 {
//e.Logger.Error(err)
for _, err2 := range errs {
errStr = append(errStr, err2.Error())
}
return errors.New(strings.Join(errStr, ","))
}
}
return nil
}
2025-02-10 17:55:34 +08:00
2025-02-11 09:39:15 +08:00
func (t LimitOrderTimeoutDuration) Exec(arg interface{}) error {
2025-02-10 17:55:34 +08:00
str := time.Now().Format(timeFormat) + " [INFO] JobCore ClearLogJob exec success"
defer logger.Info(str)
var db *gorm.DB
for _, item := range sdk.Runtime.GetDb() {
db = item
break
}
var req = new(dto.SysConfigByKeyReq)
var resp = new(dto.GetSysConfigByKEYForServiceResp)
req.ConfigKey = "limitOrderTimeoutDuration"
serviceConfig := service.SysConfig{}
serviceConfig.Orm = db
serviceConfig.GetWithKey(req, resp)
if resp.ConfigValue == "" {
return nil
}
2025-02-18 15:40:45 +08:00
lock := helper.NewRedisLock(rediskey.JobReOrderTrigger, 20, 5, 100*time.Millisecond)
if ok, err := lock.AcquireWait(context.Background()); err != nil {
logger.Error("获取锁失败", err)
return nil
} else if ok {
defer lock.Release()
limitOrderTimeoutDuration := utility.StringAsInt64(resp.ConfigValue)
orders := make([]models.LinePreOrder, 0)
err := db.Model(&models.LinePreOrder{}).Where("status = '5' AND main_order_type = 'LIMIT' AND order_type in ('4') AND updated_at < ?", time.Now().Add(-time.Duration(limitOrderTimeoutDuration)*time.Second)).Find(&orders).Error
if err != nil {
return err
}
spotApi := binanceservice.SpotRestApi{}
futApi := binanceservice.FutRestApi{}
for _, order := range orders {
var apiUserinfo models.LineApiUser
db.Model(&models.LineApiUser{}).Where("id = ?", order.ApiId).Find(&apiUserinfo)
//现货
if order.SymbolType == global.SYMBOL_SPOT {
if order.ExchangeType == global.EXCHANGE_BINANCE {
err := t.ReSpotOrderPlace(db, order, apiUserinfo, spotApi)
if err != nil {
continue
}
2025-02-10 17:55:34 +08:00
}
}
2025-02-18 15:40:45 +08:00
//合约
if order.SymbolType == global.SYMBOL_FUTURES {
if order.ExchangeType == global.EXCHANGE_BINANCE {
err := t.ReFutOrderPlace(db, order, apiUserinfo, futApi)
if err != nil {
continue
}
2025-02-11 09:39:15 +08:00
}
}
2025-02-18 15:40:45 +08:00
}
2025-02-11 09:39:15 +08:00
}
return nil
}
2025-02-10 17:55:34 +08:00
2025-02-11 09:39:15 +08:00
// ReSpotOrderPlace 重下现货市价单
func (t LimitOrderTimeoutDuration) ReSpotOrderPlace(db *gorm.DB, order models.LinePreOrder, apiUserinfo models.LineApiUser, spotApi binanceservice.SpotRestApi) error {
var err error
for i := 0; i < 3; i++ {
2025-02-18 15:40:45 +08:00
err = spotApi.CancelOpenOrderByOrderSn(apiUserinfo, order.Symbol, order.OrderSn)
2025-02-11 09:39:15 +08:00
if err == nil || strings.Contains(err.Error(), "该交易对没有订单") {
break
}
}
if err != nil {
logger.Error(fmt.Sprintf("取消现货委托失败:order_sn:%s err:%+v", order.OrderSn, err))
return err
} else {
newClientOrderId := snowflakehelper.GetOrderId()
2025-02-18 15:40:45 +08:00
order.Desc = fmt.Sprintf("取消限价单,重下市价单源订单号:%s ", order.OrderSn)
order.OrderSn = utility.Int64ToString(snowflakehelper.GetOrderId())
order.MainOrderType = "MARKET"
// var newOrder models.LinePreOrder
// copier.Copy(&newOrder, order)
// newOrder.Id = 0
// newOrder.OrderSn = utility.Int64ToString(newClientOrderId)
// newOrder.CreatedAt = time.Now()
// newOrder.MainOrderType = "MARKET"
// err = db.Model(&models.LinePreOrder{}).Create(&newOrder).Error
err := db.Model(&order).Updates(map[string]interface{}{"desc": order.Desc, "order_sn": order.OrderSn, "main_order_type": order.MainOrderType}).Error
2025-02-11 09:39:15 +08:00
if err != nil {
logger.Error(fmt.Sprintf("生成新市价单失败 err:%+v", err))
return err
}
params := binanceservice.OrderPlacementService{
ApiId: order.ApiId,
Symbol: order.Symbol,
Side: order.Site,
Type: "MARKET",
TimeInForce: "GTC",
Price: utility.StringToDecimal(order.Price),
2025-02-18 15:40:45 +08:00
StopPrice: utility.StrToDecimal(order.Price),
2025-02-11 09:39:15 +08:00
Quantity: utility.StringToDecimal(order.Num),
NewClientOrderId: utility.Int64ToString(newClientOrderId),
2025-02-10 17:55:34 +08:00
}
2025-02-11 09:39:15 +08:00
if err := spotApi.OrderPlace(db, params); err != nil {
logger.Error(fmt.Sprintf("重新下市价单失败 err:%+v", err))
2025-02-18 15:40:45 +08:00
err := db.Model(&order).Updates(map[string]interface{}{"status": "2", "desc": order.Desc + "err:" + err.Error()}).Error
2025-02-10 17:55:34 +08:00
2025-02-11 09:39:15 +08:00
if err != nil {
logger.Error("下单失败后修改订单失败")
return err
}
}
2025-02-10 17:55:34 +08:00
}
2025-02-11 09:39:15 +08:00
2025-02-10 17:55:34 +08:00
return nil
}
2025-02-11 09:39:15 +08:00
// ReFutOrderPlace 重下合约市价单
func (t LimitOrderTimeoutDuration) ReFutOrderPlace(db *gorm.DB, order models.LinePreOrder, apiUserinfo models.LineApiUser, futApi binanceservice.FutRestApi) error {
var err error
for i := 0; i < 3; i++ {
err := futApi.CancelFutOrder(apiUserinfo, order.Symbol, order.OrderSn)
if err == nil || strings.Contains(err.Error(), "该交易对没有订单") {
break
}
}
if err != nil {
logger.Error(fmt.Sprintf("取消现货委托失败:order_sn:%s err:%+v", order.OrderSn, err))
return err
} else {
newClientOrderId := snowflakehelper.GetOrderId()
2025-02-18 15:40:45 +08:00
orderType := "MARKET"
order.Desc = fmt.Sprintf("取消限价单,重下市价单 源订单号:%s", order.OrderSn)
order.OrderSn = utility.Int64ToString(newClientOrderId)
// var newOrder models.LinePreOrder
// copier.Copy(&newOrder, order)
// newOrder.Id = 0
// newOrder.OrderSn = utility.Int64ToString(newClientOrderId)
// newOrder.CreatedAt = time.Now()
// newOrder.MainOrderType = "MARKET"
// err = db.Model(&models.LinePreOrder{}).Create(&newOrder).Error
err = db.Model(&order).Updates(map[string]interface{}{"desc": order.Desc, "order_sn": order.OrderSn}).Error
2025-02-11 09:39:15 +08:00
if err != nil {
logger.Error(fmt.Sprintf("生成合约新市价单失败 err:%+v", err))
return err
}
2025-02-18 15:40:45 +08:00
if order.OrderType == 4 {
orderType = "STOP_MARKET"
}
2025-02-11 09:39:15 +08:00
params := binanceservice.FutOrderPlace{
ApiId: order.ApiId,
Symbol: order.Symbol,
Side: order.Site,
Quantity: utility.StringToDecimal(order.Num),
Price: utility.StringToDecimal(order.Price),
SideType: "MARKET",
OpenOrder: 0,
Profit: decimal.Decimal{},
2025-02-18 15:40:45 +08:00
StopPrice: utility.StringToDecimal(order.Price),
OrderType: orderType,
2025-02-11 09:39:15 +08:00
NewClientOrderId: utility.Int64ToString(newClientOrderId),
}
if err := futApi.OrderPlace(db, params); err != nil {
logger.Error(fmt.Sprintf("重新下合约市价单失败 err:%+v", err))
2025-02-18 15:40:45 +08:00
err := db.Model(&order).Updates(map[string]interface{}{"status": "2", "desc": order.Desc + " err:" + err.Error()}).Error
2025-02-11 09:39:15 +08:00
if err != nil {
logger.Error("下单失败后修改订单失败")
return err
}
}
}
return err
}