87 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			87 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package serverinit
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"go-admin/app/admin/models"
 | 
						|
	"go-admin/app/admin/service"
 | 
						|
	"go-admin/common/const/rediskey"
 | 
						|
	"go-admin/common/global"
 | 
						|
	"go-admin/common/helper"
 | 
						|
	"go-admin/pkg/utility"
 | 
						|
	"go-admin/services/excservice"
 | 
						|
	"os"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/bytedance/sonic"
 | 
						|
	log "github.com/go-admin-team/go-admin-core/logger"
 | 
						|
	"gorm.io/gorm"
 | 
						|
)
 | 
						|
 | 
						|
// 初始化用户订阅ws连接
 | 
						|
func UserSubscribeInit(orm *gorm.DB, ctx context.Context) {
 | 
						|
	var list []models.LineApiUser
 | 
						|
 | 
						|
	err := orm.Model(&models.LineApiUser{}).Where("open_status=1 AND exchange_type =?", global.EXCHANGE_BINANCE).Find(&list).Error
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		log.Error("获取用户api失败", err)
 | 
						|
		os.Exit(1)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, item := range list {
 | 
						|
		service.OpenUserBinanceWebsocket(item)
 | 
						|
	}
 | 
						|
 | 
						|
	utility.SafeGo(func() {
 | 
						|
		ticker := time.NewTicker(time.Second * 5)
 | 
						|
		defer ticker.Stop()
 | 
						|
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
				log.Info("ctx.Done() 被触发,原因:", ctx.Err()) // 打印上下文错误信息
 | 
						|
				return
 | 
						|
			case <-ticker.C:
 | 
						|
				deleteKeys, _ := helper.DefaultRedis.GetAllList(rediskey.ApiUserDeleteList)
 | 
						|
				activeApis, _ := helper.DefaultRedis.GetAllList(rediskey.ApiUserActiveList)
 | 
						|
 | 
						|
				//移除连接
 | 
						|
				for _, item := range deleteKeys {
 | 
						|
					if wm, ok := excservice.SpotSockets[item]; ok {
 | 
						|
						wm.Stop()
 | 
						|
 | 
						|
						delete(excservice.FutureSockets, item)
 | 
						|
					}
 | 
						|
 | 
						|
					if wm, ok := excservice.SpotSockets[item]; ok {
 | 
						|
						wm.Stop()
 | 
						|
 | 
						|
						delete(excservice.SpotSockets, item)
 | 
						|
					}
 | 
						|
 | 
						|
					if _, err := helper.DefaultRedis.LRem(rediskey.ApiUserDeleteList, item); err != nil {
 | 
						|
						log.Error("移除 待关闭websocket 失败:", err)
 | 
						|
					}
 | 
						|
				}
 | 
						|
 | 
						|
				//连接websocket
 | 
						|
				var apiUser models.LineApiUser
 | 
						|
				for _, item := range activeApis {
 | 
						|
					if item == "" {
 | 
						|
						continue
 | 
						|
					}
 | 
						|
 | 
						|
					sonic.Unmarshal([]byte(item), &apiUser)
 | 
						|
 | 
						|
					if apiUser.Id > 0 {
 | 
						|
						service.OpenUserBinanceWebsocket(apiUser)
 | 
						|
						if _, err := helper.DefaultRedis.LRem(rediskey.ApiUserActiveList, item); err != nil {
 | 
						|
							log.Error("删除 待触发的apiUser失败", err)
 | 
						|
						}
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	})
 | 
						|
}
 |