Files
exchange_go/app/admin/service/line_api_user.go
2025-03-27 16:18:32 +08:00

417 lines
9.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"errors"
"fmt"
"strings"
"github.com/bytedance/sonic"
"github.com/go-admin-team/go-admin-core/logger"
"github.com/go-admin-team/go-admin-core/sdk/service"
"gorm.io/gorm"
"go-admin/app/admin/models"
"go-admin/app/admin/service/dto"
"go-admin/common/actions"
"go-admin/common/const/rediskey"
cDto "go-admin/common/dto"
"go-admin/common/global"
"go-admin/common/helper"
"go-admin/models/binancedto"
"go-admin/services/excservice"
)
type LineApiUser struct {
service.Service
}
// GetPage 获取LineApiUser列表
func (e *LineApiUser) GetPage(c *dto.LineApiUserGetPageReq, p *actions.DataPermission, list *[]models.LineApiUser, count *int64) error {
var err error
var data models.LineApiUser
query := e.Orm.Model(&data).
Scopes(
cDto.MakeCondition(c.GetNeedSearch()),
cDto.Paginate(c.GetPageSize(), c.GetPageIndex()),
actions.Permission(data.TableName(), p),
)
if c.ExchangeType != "" {
query.Where("exchange_type = ?", c.ExchangeType)
}
if c.OpenStatus != nil {
query.Where("open_status = ?", c.OpenStatus)
}
err = query.
Find(list).Limit(-1).Offset(-1).
Count(count).Error
if err != nil {
e.Log.Errorf("LineApiUserService GetPage error:%s \r\n", err)
return err
}
var userSub binancedto.UserSubscribeState
for index := range *list {
val, _ := helper.DefaultRedis.GetString(fmt.Sprintf(global.USER_SUBSCRIBE, (*list)[index].ApiKey))
if val != "" {
sonic.Unmarshal([]byte(val), &userSub)
if userSub.FuturesLastTime != nil {
(*list)[index].FuturesLastTime = userSub.FuturesLastTime.Format("2006-01-02 15:04:05")
}
if userSub.SpotLastTime != nil {
(*list)[index].SpotLastTime = userSub.SpotLastTime.Format("2006-01-02 15:04:05")
}
}
}
return nil
}
// Get 获取LineApiUser对象
func (e *LineApiUser) Get(d *dto.LineApiUserGetReq, p *actions.DataPermission, model *models.LineApiUser) error {
var data models.LineApiUser
err := e.Orm.Model(&data).
Scopes(
actions.Permission(data.TableName(), p),
).
First(model, d.GetId()).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
err = errors.New("查看对象不存在或无权查看")
e.Log.Errorf("Service GetLineApiUser error:%s \r\n", err)
return err
}
if err != nil {
e.Log.Errorf("db error:%s", err)
return err
}
return nil
}
// Insert 创建LineApiUser对象
func (e *LineApiUser) Insert(c *dto.LineApiUserInsertReq) error {
var err error
var data models.LineApiUser
c.Generate(&data)
err = e.Orm.Create(&data).Error
if err != nil {
e.Log.Errorf("LineApiUserService Insert error:%s \r\n", err)
return err
}
e.saveCache(data)
val, _ := sonic.MarshalString(&data)
if val != "" {
if err := helper.DefaultRedis.RPushList(rediskey.ApiUserActiveList, val); err != nil {
logger.Error("添加待连接队列失败")
}
}
return nil
}
/*
重启ws
*/
func (e *LineApiUser) restartWebsocket(data models.LineApiUser) {
socket, ok := excservice.SpotSockets[data.ApiKey]
if ok && socket != nil {
socket.Stop()
}
fuSocket, ok := excservice.FutureSockets[data.ApiKey]
if ok && fuSocket != nil {
fuSocket.Stop()
}
e.saveCache(data)
OpenUserBinanceWebsocket(data)
}
// 保存缓存
func (e *LineApiUser) saveCache(data models.LineApiUser) {
val, _ := sonic.MarshalString(&data)
if val != "" {
key := fmt.Sprintf(rediskey.API_USER, data.Id)
if err := helper.DefaultRedis.SetString(key, val); err != nil {
e.Log.Error("缓存api user失败:", err)
}
}
}
/*
打开用户websocket订阅
*/
func OpenUserBinanceWebsocket(item models.LineApiUser) {
proxyType := ""
ipAddress := ""
ips := strings.Split(strings.ReplaceAll(item.IpAddress, "", ":"), "://")
if len(ips) == 2 {
proxyType = ips[0]
if item.UserPass != "" {
ipAddress = fmt.Sprintf("%s@%s", item.UserPass, ips[1])
} else {
ipAddress = ips[1]
}
}
//现货
if wm, ok := excservice.SpotSockets[item.ApiKey]; ok {
wm.Restart(item.ApiKey, item.ApiSecret, proxyType, ipAddress)
} else {
spotSocket := excservice.NewBinanceWebSocketManager(0, item.ApiKey, item.ApiSecret, proxyType, ipAddress)
spotSocket.Start()
excservice.SpotSockets[item.ApiKey] = spotSocket
}
if wm, ok := excservice.FutureSockets[item.ApiKey]; ok {
wm.Restart(item.ApiKey, item.ApiSecret, proxyType, ipAddress)
} else {
//合约
futureSocket := excservice.NewBinanceWebSocketManager(1, item.ApiKey, item.ApiSecret, proxyType, ipAddress)
futureSocket.Start()
excservice.FutureSockets[item.ApiKey] = futureSocket
}
}
// Update 修改LineApiUser对象
func (e *LineApiUser) Update(c *dto.LineApiUserUpdateReq, p *actions.DataPermission) error {
var err error
var data = models.LineApiUser{}
e.Orm.Scopes(
actions.Permission(data.TableName(), p),
).First(&data, c.GetId())
oldApiKey := data.ApiKey
c.Generate(&data)
updateGroups, err := e.GetUpdateGroups(c.Id)
//事务
err = e.Orm.Transaction(func(tx *gorm.DB) error {
db := tx.Save(&data)
if err = db.Error; err != nil {
e.Log.Errorf("LineApiUserService Save error:%s \r\n", err)
return err
}
if db.RowsAffected == 0 {
return errors.New("无权更新该数据")
}
if len(updateGroups) > 0 {
if err := tx.Save(&updateGroups).Error; err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
e.saveCache(data)
//旧key和新的key不一样则关闭旧的websocket
if oldApiKey != data.ApiKey {
if err := helper.DefaultRedis.RPushList(rediskey.ApiUserDeleteList, oldApiKey); err != nil {
logger.Error("写入待关闭websocket api key 失败:", err)
}
}
val, _ := sonic.MarshalString(&data)
if val != "" {
if err := helper.DefaultRedis.RPushList(rediskey.ApiUserActiveList, val); err != nil {
logger.Error("写入待激活websocket api key 失败:", err)
}
}
return nil
}
func (e *LineApiUser) GetUpdateGroups(apiUserId int) ([]models.LineApiUserGroup, error) {
updateGroups := make([]models.LineApiUserGroup, 0)
var groups []models.LineApiUserGroup
if err := e.Orm.Model(models.LineApiUserGroup{}).Where(" concat(',',user_id,',') like ? ", fmt.Sprintf("%%,%v,%%", apiUserId)).Find(&groups).Error; err != nil {
return nil, err
}
delId := fmt.Sprintf(",%v,", apiUserId)
for _, group := range groups {
userId := strings.ReplaceAll(("," + group.UserId + ","), delId, "")
group.UserId = strings.Trim(userId, ",")
updateGroups = append(updateGroups, group)
}
return updateGroups, nil
}
// Remove 删除LineApiUser
func (e *LineApiUser) Remove(d *dto.LineApiUserDeleteReq, p *actions.DataPermission) error {
var data models.LineApiUser
db := e.Orm.Model(&data).
Scopes(
actions.Permission(data.TableName(), p),
).Delete(&data, d.GetId())
if err := db.Error; err != nil {
e.Log.Errorf("Service RemoveLineApiUser error:%s \r\n", err)
return err
}
if db.RowsAffected == 0 {
return errors.New("无权删除该数据")
}
//移除缓存
delKeys := make([]string, 0)
for _, id := range d.Ids {
key := fmt.Sprintf(rediskey.API_USER, id)
delKeys = append(delKeys, key)
}
_, err := helper.DefaultRedis.BatchDeleteKeys(delKeys)
if err != nil {
e.Log.Error("批量删除api_user key失败", err)
}
return nil
}
// Bind 绑定从属关系
func (e *LineApiUser) Bind(req *dto.LineApiUserBindSubordinateReq, p *actions.DataPermission) error {
split := strings.Split(req.UserIdStr, ",")
if len(split) != 2 {
return errors.New("绑定关系必须是两个账号")
}
for _, s := range split {
var apiUserinfo models.LineApiUser
e.Orm.Model(&models.LineApiUser{}).Where("id = ? AND subordinate <> '0'", s).Find(&apiUserinfo)
if apiUserinfo.Id > 0 {
return errors.New("该用户已经绑定关系")
}
}
group := models.LineApiGroup{
GroupName: req.GroupName,
ApiUserId: req.UserIdStr,
}
err := e.Orm.Transaction(func(tx *gorm.DB) error {
err := tx.Model(&models.LineApiGroup{}).Create(&group).Error
if err != nil {
return err
}
for i, s := range split {
if i == 0 {
err = e.Orm.Model(&models.LineApiUser{}).Where("id = ?", s).Updates(map[string]interface{}{
"subordinate": "1",
"group_id": group.Id,
}).Error
if err != nil {
return err
}
} else {
err = e.Orm.Model(&models.LineApiUser{}).Where("id = ?", s).Updates(map[string]interface{}{
"subordinate": "2",
"group_id": group.Id,
}).Error
if err != nil {
return err
}
}
}
return nil
})
if err != nil {
return errors.New("操作失败")
}
SetApiGroupCache(group)
return nil
}
// GetUser 获取未绑定的用户列表
func (e *LineApiUser) GetUser(req *dto.LineApiUserBindSubordinateReq, list *[]models.LineApiUser) error {
var err error
var data models.LineApiUser
query := e.Orm.Model(&data).Where("subordinate = '0'")
if req.ExchangeType != "" {
query = query.Where("exchange_type = ?", req.ExchangeType)
}
err = query.Find(list).Error
if err != nil {
e.Log.Errorf("LineApiUserService error:%s \r\n", err)
return err
}
return nil
}
// GetMainUser 获取主账号
func (e *LineApiUser) GetMainUser(req *dto.GetMainUserReq, list *[]models.LineApiUser) error {
var err error
var data models.LineApiUser
query := e.Orm.Model(&data).Where("subordinate = '1' AND group_id > 0")
if req.ExchangeType != "" {
query = query.Where("exchange_type =?", req.ExchangeType)
}
err = query.Find(list).Error
if err != nil {
e.Log.Errorf("LineApiUserService error:%s \r\n", err)
return err
}
return nil
}
// InitCache 初始化缓存
func (e *LineApiUser) InitCache() error {
var items []models.LineApiUser
if err := e.Orm.Model(&models.LineApiUser{}).Find(&items).Error; err != nil {
e.Log.Errorf("LineApiUserService error:%s \r\n", err)
return err
}
for _, item := range items {
e.saveCache(item)
}
return nil
}
// GetActiveApis 获取激活的api
func (e *LineApiUser) GetActiveApis(apiIds []int) ([]int, error) {
result := make([]int, 0)
if err := e.Orm.Model(&models.LineApiUser{}).Where("open_status = 1 and id IN ?", apiIds).Pluck("id", &result).Error; err != nil {
return result, err
}
return result, nil
}