457 lines
14 KiB
Go
457 lines
14 KiB
Go
|
|
package binanceservice
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"fmt"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/go-admin-team/go-admin-core/logger"
|
||
|
|
"gorm.io/gorm"
|
||
|
|
)
|
||
|
|
|
||
|
|
// TransactionManager 事务管理器
|
||
|
|
type TransactionManager struct {
|
||
|
|
db *gorm.DB
|
||
|
|
metrics *MetricsCollector
|
||
|
|
config *OptimizedConfig
|
||
|
|
}
|
||
|
|
|
||
|
|
// TransactionConfig 事务配置
|
||
|
|
type TransactionConfig struct {
|
||
|
|
Timeout time.Duration `json:"timeout"`
|
||
|
|
RetryAttempts int `json:"retry_attempts"`
|
||
|
|
RetryDelay time.Duration `json:"retry_delay"`
|
||
|
|
IsolationLevel string `json:"isolation_level"`
|
||
|
|
ReadOnly bool `json:"read_only"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// TransactionContext 事务上下文
|
||
|
|
type TransactionContext struct {
|
||
|
|
Tx *gorm.DB
|
||
|
|
StartTime time.Time
|
||
|
|
Config TransactionConfig
|
||
|
|
Context context.Context
|
||
|
|
}
|
||
|
|
|
||
|
|
// TransactionResult 事务结果
|
||
|
|
type TransactionResult struct {
|
||
|
|
Success bool `json:"success"`
|
||
|
|
Duration time.Duration `json:"duration"`
|
||
|
|
Error error `json:"error"`
|
||
|
|
Retries int `json:"retries"`
|
||
|
|
Timestamp time.Time `json:"timestamp"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewTransactionManager 创建事务管理器
|
||
|
|
func NewTransactionManager(db *gorm.DB, metrics *MetricsCollector, config *OptimizedConfig) *TransactionManager {
|
||
|
|
return &TransactionManager{
|
||
|
|
db: db,
|
||
|
|
metrics: metrics,
|
||
|
|
config: config,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// WithTransaction 在事务中执行操作
|
||
|
|
func (tm *TransactionManager) WithTransaction(ctx context.Context, config TransactionConfig, operation func(*TransactionContext) error) *TransactionResult {
|
||
|
|
startTime := time.Now()
|
||
|
|
result := &TransactionResult{
|
||
|
|
Timestamp: startTime,
|
||
|
|
}
|
||
|
|
|
||
|
|
// 设置默认配置
|
||
|
|
if config.Timeout == 0 {
|
||
|
|
config.Timeout = 30 * time.Second // 默认事务超时时间
|
||
|
|
}
|
||
|
|
if config.RetryAttempts == 0 {
|
||
|
|
config.RetryAttempts = tm.config.RetryConfig.MaxRetries
|
||
|
|
}
|
||
|
|
if config.RetryDelay == 0 {
|
||
|
|
config.RetryDelay = tm.config.RetryConfig.RetryDelay
|
||
|
|
}
|
||
|
|
|
||
|
|
// 执行事务(带重试)
|
||
|
|
for attempt := 1; attempt <= config.RetryAttempts; attempt++ {
|
||
|
|
err := tm.executeTransaction(ctx, config, operation)
|
||
|
|
if err == nil {
|
||
|
|
result.Success = true
|
||
|
|
result.Duration = time.Since(startTime)
|
||
|
|
result.Retries = attempt - 1
|
||
|
|
break
|
||
|
|
}
|
||
|
|
|
||
|
|
result.Error = err
|
||
|
|
result.Retries = attempt
|
||
|
|
|
||
|
|
// 检查是否可重试
|
||
|
|
if !tm.isRetryableError(err) || attempt == config.RetryAttempts {
|
||
|
|
break
|
||
|
|
}
|
||
|
|
|
||
|
|
// 等待重试
|
||
|
|
logger.Warnf("事务执行失败,第 %d 次重试: %v", attempt, err)
|
||
|
|
time.Sleep(config.RetryDelay * time.Duration(attempt))
|
||
|
|
}
|
||
|
|
|
||
|
|
result.Duration = time.Since(startTime)
|
||
|
|
|
||
|
|
// 记录指标
|
||
|
|
if tm.metrics != nil {
|
||
|
|
tm.metrics.RecordDbOperation(true, result.Duration, result.Error)
|
||
|
|
}
|
||
|
|
|
||
|
|
return result
|
||
|
|
}
|
||
|
|
|
||
|
|
// executeTransaction 执行单次事务
|
||
|
|
func (tm *TransactionManager) executeTransaction(ctx context.Context, config TransactionConfig, operation func(*TransactionContext) error) error {
|
||
|
|
// 设置事务超时
|
||
|
|
txCtx := ctx
|
||
|
|
if config.Timeout > 0 {
|
||
|
|
var cancel context.CancelFunc
|
||
|
|
txCtx, cancel = context.WithTimeout(ctx, config.Timeout)
|
||
|
|
defer cancel()
|
||
|
|
}
|
||
|
|
|
||
|
|
// 开始事务
|
||
|
|
tx := tm.db.WithContext(txCtx).Begin()
|
||
|
|
if tx.Error != nil {
|
||
|
|
return fmt.Errorf("开始事务失败: %w", tx.Error)
|
||
|
|
}
|
||
|
|
|
||
|
|
// 设置事务隔离级别
|
||
|
|
if config.IsolationLevel != "" {
|
||
|
|
if err := tx.Exec(fmt.Sprintf("SET TRANSACTION ISOLATION LEVEL %s", config.IsolationLevel)).Error; err != nil {
|
||
|
|
tx.Rollback()
|
||
|
|
return fmt.Errorf("设置事务隔离级别失败: %w", err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 设置只读模式
|
||
|
|
if config.ReadOnly {
|
||
|
|
if err := tx.Exec("SET TRANSACTION READ ONLY").Error; err != nil {
|
||
|
|
tx.Rollback()
|
||
|
|
return fmt.Errorf("设置只读事务失败: %w", err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 创建事务上下文
|
||
|
|
txContext := &TransactionContext{
|
||
|
|
Tx: tx,
|
||
|
|
StartTime: time.Now(),
|
||
|
|
Config: config,
|
||
|
|
Context: txCtx,
|
||
|
|
}
|
||
|
|
|
||
|
|
// 执行操作
|
||
|
|
err := operation(txContext)
|
||
|
|
if err != nil {
|
||
|
|
// 回滚事务
|
||
|
|
if rollbackErr := tx.Rollback().Error; rollbackErr != nil {
|
||
|
|
logger.Errorf("事务回滚失败: %v", rollbackErr)
|
||
|
|
return fmt.Errorf("操作失败且回滚失败: %w, 回滚错误: %v", err, rollbackErr)
|
||
|
|
}
|
||
|
|
return fmt.Errorf("事务操作失败: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// 提交事务
|
||
|
|
if commitErr := tx.Commit().Error; commitErr != nil {
|
||
|
|
return fmt.Errorf("事务提交失败: %w", commitErr)
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// WithOptimisticLock 使用乐观锁执行操作
|
||
|
|
func (tm *TransactionManager) WithOptimisticLock(ctx context.Context, config TransactionConfig, operation func(*TransactionContext) error) *TransactionResult {
|
||
|
|
return tm.WithTransaction(ctx, config, func(txCtx *TransactionContext) error {
|
||
|
|
// 在乐观锁模式下,我们需要在操作中检查版本号
|
||
|
|
return operation(txCtx)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
// SaveWithOptimisticLock 使用乐观锁保存数据
|
||
|
|
func (tm *TransactionManager) SaveWithOptimisticLock(ctx context.Context, model interface{}, condition string, args ...interface{}) error {
|
||
|
|
config := TransactionConfig{
|
||
|
|
Timeout: 30 * time.Second,
|
||
|
|
RetryAttempts: 3,
|
||
|
|
RetryDelay: 100 * time.Millisecond,
|
||
|
|
}
|
||
|
|
|
||
|
|
result := tm.WithTransaction(ctx, config, func(txCtx *TransactionContext) error {
|
||
|
|
// 使用 WHERE 条件和版本号进行更新
|
||
|
|
result := txCtx.Tx.Where(condition, args...).Save(model)
|
||
|
|
if result.Error != nil {
|
||
|
|
return result.Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// 检查是否有行被更新
|
||
|
|
if result.RowsAffected == 0 {
|
||
|
|
return fmt.Errorf("乐观锁冲突: 数据已被其他进程修改")
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
|
||
|
|
return result.Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// UpdateWithOptimisticLock 使用乐观锁更新数据
|
||
|
|
func (tm *TransactionManager) UpdateWithOptimisticLock(ctx context.Context, model interface{}, updates map[string]interface{}, condition string, args ...interface{}) error {
|
||
|
|
config := TransactionConfig{
|
||
|
|
Timeout: 30 * time.Second,
|
||
|
|
RetryAttempts: 3,
|
||
|
|
RetryDelay: 100 * time.Millisecond,
|
||
|
|
}
|
||
|
|
|
||
|
|
result := tm.WithTransaction(ctx, config, func(txCtx *TransactionContext) error {
|
||
|
|
// 增加版本号
|
||
|
|
if _, hasVersion := updates["version"]; !hasVersion {
|
||
|
|
updates["version"] = gorm.Expr("version + 1")
|
||
|
|
}
|
||
|
|
|
||
|
|
// 执行更新
|
||
|
|
result := txCtx.Tx.Model(model).Where(condition, args...).Updates(updates)
|
||
|
|
if result.Error != nil {
|
||
|
|
return result.Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// 检查是否有行被更新
|
||
|
|
if result.RowsAffected == 0 {
|
||
|
|
return fmt.Errorf("乐观锁冲突: 数据已被其他进程修改")
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
|
||
|
|
return result.Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// BatchOperation 批量操作
|
||
|
|
func (tm *TransactionManager) BatchOperation(ctx context.Context, config TransactionConfig, operations []func(*TransactionContext) error) *TransactionResult {
|
||
|
|
return tm.WithTransaction(ctx, config, func(txCtx *TransactionContext) error {
|
||
|
|
for i, operation := range operations {
|
||
|
|
if err := operation(txCtx); err != nil {
|
||
|
|
return fmt.Errorf("批量操作第 %d 步失败: %w", i+1, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
// CreateSavepoint 创建保存点
|
||
|
|
func (tm *TransactionManager) CreateSavepoint(txCtx *TransactionContext, name string) error {
|
||
|
|
return txCtx.Tx.Exec(fmt.Sprintf("SAVEPOINT %s", name)).Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// RollbackToSavepoint 回滚到保存点
|
||
|
|
func (tm *TransactionManager) RollbackToSavepoint(txCtx *TransactionContext, name string) error {
|
||
|
|
return txCtx.Tx.Exec(fmt.Sprintf("ROLLBACK TO SAVEPOINT %s", name)).Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// ReleaseSavepoint 释放保存点
|
||
|
|
func (tm *TransactionManager) ReleaseSavepoint(txCtx *TransactionContext, name string) error {
|
||
|
|
return txCtx.Tx.Exec(fmt.Sprintf("RELEASE SAVEPOINT %s", name)).Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// isRetryableError 判断错误是否可重试
|
||
|
|
func (tm *TransactionManager) isRetryableError(err error) bool {
|
||
|
|
if err == nil {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
|
||
|
|
errorMsg := err.Error()
|
||
|
|
|
||
|
|
// 可重试的错误类型
|
||
|
|
retryableErrors := []string{
|
||
|
|
"deadlock",
|
||
|
|
"lock wait timeout",
|
||
|
|
"connection",
|
||
|
|
"timeout",
|
||
|
|
"temporary",
|
||
|
|
"乐观锁冲突",
|
||
|
|
}
|
||
|
|
|
||
|
|
for _, retryableError := range retryableErrors {
|
||
|
|
if strings.Contains(errorMsg, retryableError) {
|
||
|
|
return true
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
|
||
|
|
// GetTransactionStats 获取事务统计信息
|
||
|
|
func (tm *TransactionManager) GetTransactionStats() map[string]interface{} {
|
||
|
|
stats := make(map[string]interface{})
|
||
|
|
|
||
|
|
// 从指标收集器获取数据库相关统计
|
||
|
|
if tm.metrics != nil {
|
||
|
|
stats["total_transactions"] = tm.metrics.PerformanceMetrics.DbTransactions
|
||
|
|
stats["total_queries"] = tm.metrics.PerformanceMetrics.DbQueries
|
||
|
|
stats["total_errors"] = tm.metrics.PerformanceMetrics.DbErrors
|
||
|
|
stats["avg_query_time"] = tm.metrics.PerformanceMetrics.DbQueryTime
|
||
|
|
}
|
||
|
|
|
||
|
|
return stats
|
||
|
|
}
|
||
|
|
|
||
|
|
// 预定义的事务配置
|
||
|
|
|
||
|
|
// GetDefaultTransactionConfig 获取默认事务配置
|
||
|
|
func GetDefaultTransactionConfig() TransactionConfig {
|
||
|
|
return TransactionConfig{
|
||
|
|
Timeout: 30 * time.Second,
|
||
|
|
RetryAttempts: 3,
|
||
|
|
RetryDelay: 100 * time.Millisecond,
|
||
|
|
IsolationLevel: "READ COMMITTED",
|
||
|
|
ReadOnly: false,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// GetReadOnlyTransactionConfig 获取只读事务配置
|
||
|
|
func GetReadOnlyTransactionConfig() TransactionConfig {
|
||
|
|
return TransactionConfig{
|
||
|
|
Timeout: 10 * time.Second,
|
||
|
|
RetryAttempts: 1,
|
||
|
|
RetryDelay: 0,
|
||
|
|
IsolationLevel: "READ COMMITTED",
|
||
|
|
ReadOnly: true,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// GetLongRunningTransactionConfig 获取长时间运行事务配置
|
||
|
|
func GetLongRunningTransactionConfig() TransactionConfig {
|
||
|
|
return TransactionConfig{
|
||
|
|
Timeout: 300 * time.Second, // 5分钟
|
||
|
|
RetryAttempts: 5,
|
||
|
|
RetryDelay: 500 * time.Millisecond,
|
||
|
|
IsolationLevel: "REPEATABLE READ",
|
||
|
|
ReadOnly: false,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// GetCriticalTransactionConfig 获取关键事务配置
|
||
|
|
func GetCriticalTransactionConfig() TransactionConfig {
|
||
|
|
return TransactionConfig{
|
||
|
|
Timeout: 60 * time.Second,
|
||
|
|
RetryAttempts: 5,
|
||
|
|
RetryDelay: 200 * time.Millisecond,
|
||
|
|
IsolationLevel: "SERIALIZABLE",
|
||
|
|
ReadOnly: false,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 全局事务管理器实例
|
||
|
|
var GlobalTransactionManager *TransactionManager
|
||
|
|
|
||
|
|
// InitTransactionManager 初始化事务管理器
|
||
|
|
func InitTransactionManager(db *gorm.DB, metrics *MetricsCollector, config *OptimizedConfig) {
|
||
|
|
GlobalTransactionManager = NewTransactionManager(db, metrics, config)
|
||
|
|
}
|
||
|
|
|
||
|
|
// GetTransactionManager 获取全局事务管理器
|
||
|
|
func GetTransactionManager() *TransactionManager {
|
||
|
|
return GlobalTransactionManager
|
||
|
|
}
|
||
|
|
|
||
|
|
// WithDefaultTransaction 使用默认配置执行事务
|
||
|
|
func WithDefaultTransaction(ctx context.Context, operation func(*TransactionContext) error) error {
|
||
|
|
if GlobalTransactionManager == nil {
|
||
|
|
return fmt.Errorf("事务管理器未初始化")
|
||
|
|
}
|
||
|
|
|
||
|
|
config := GetDefaultTransactionConfig()
|
||
|
|
result := GlobalTransactionManager.WithTransaction(ctx, config, operation)
|
||
|
|
return result.Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// WithReadOnlyTransaction 使用只读配置执行事务
|
||
|
|
func WithReadOnlyTransaction(ctx context.Context, operation func(*TransactionContext) error) error {
|
||
|
|
if GlobalTransactionManager == nil {
|
||
|
|
return fmt.Errorf("事务管理器未初始化")
|
||
|
|
}
|
||
|
|
|
||
|
|
config := GetReadOnlyTransactionConfig()
|
||
|
|
result := GlobalTransactionManager.WithTransaction(ctx, config, operation)
|
||
|
|
return result.Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// WithCriticalTransaction 使用关键事务配置执行事务
|
||
|
|
func WithCriticalTransaction(ctx context.Context, operation func(*TransactionContext) error) error {
|
||
|
|
if GlobalTransactionManager == nil {
|
||
|
|
return fmt.Errorf("事务管理器未初始化")
|
||
|
|
}
|
||
|
|
|
||
|
|
config := GetCriticalTransactionConfig()
|
||
|
|
result := GlobalTransactionManager.WithTransaction(ctx, config, operation)
|
||
|
|
return result.Error
|
||
|
|
}
|
||
|
|
|
||
|
|
// SaveWithOptimisticLockGlobal 全局乐观锁保存
|
||
|
|
func SaveWithOptimisticLockGlobal(ctx context.Context, model interface{}, condition string, args ...interface{}) error {
|
||
|
|
if GlobalTransactionManager == nil {
|
||
|
|
return fmt.Errorf("事务管理器未初始化")
|
||
|
|
}
|
||
|
|
|
||
|
|
return GlobalTransactionManager.SaveWithOptimisticLock(ctx, model, condition, args...)
|
||
|
|
}
|
||
|
|
|
||
|
|
// UpdateWithOptimisticLockGlobal 全局乐观锁更新
|
||
|
|
func UpdateWithOptimisticLockGlobal(ctx context.Context, model interface{}, updates map[string]interface{}, condition string, args ...interface{}) error {
|
||
|
|
if GlobalTransactionManager == nil {
|
||
|
|
return fmt.Errorf("事务管理器未初始化")
|
||
|
|
}
|
||
|
|
|
||
|
|
return GlobalTransactionManager.UpdateWithOptimisticLock(ctx, model, updates, condition, args...)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TransactionHelper 事务辅助函数集合
|
||
|
|
type TransactionHelper struct {
|
||
|
|
tm *TransactionManager
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewTransactionHelper 创建事务辅助器
|
||
|
|
func NewTransactionHelper(tm *TransactionManager) *TransactionHelper {
|
||
|
|
return &TransactionHelper{tm: tm}
|
||
|
|
}
|
||
|
|
|
||
|
|
// SavePosition 保存持仓(带乐观锁)
|
||
|
|
func (th *TransactionHelper) SavePosition(ctx context.Context, position interface{}, userID, symbol string, version int) error {
|
||
|
|
condition := "user_id = ? AND symbol = ? AND version = ?"
|
||
|
|
return th.tm.SaveWithOptimisticLock(ctx, position, condition, userID, symbol, version)
|
||
|
|
}
|
||
|
|
|
||
|
|
// SaveOrder 保存订单(带乐观锁)
|
||
|
|
func (th *TransactionHelper) SaveOrder(ctx context.Context, order interface{}, orderID string, version int) error {
|
||
|
|
condition := "order_id = ? AND version = ?"
|
||
|
|
return th.tm.SaveWithOptimisticLock(ctx, order, condition, orderID, version)
|
||
|
|
}
|
||
|
|
|
||
|
|
// UpdateOrderStatus 更新订单状态(带乐观锁)
|
||
|
|
func (th *TransactionHelper) UpdateOrderStatus(ctx context.Context, orderID, newStatus string, version int) error {
|
||
|
|
updates := map[string]interface{}{
|
||
|
|
"status": newStatus,
|
||
|
|
"updated_at": time.Now(),
|
||
|
|
}
|
||
|
|
condition := "order_id = ? AND version = ?"
|
||
|
|
return th.tm.UpdateWithOptimisticLock(ctx, nil, updates, condition, orderID, version)
|
||
|
|
}
|
||
|
|
|
||
|
|
// UpdatePositionQuantity 更新持仓数量(带乐观锁)
|
||
|
|
func (th *TransactionHelper) UpdatePositionQuantity(ctx context.Context, userID, symbol string, quantity float64, version int) error {
|
||
|
|
updates := map[string]interface{}{
|
||
|
|
"quantity": quantity,
|
||
|
|
"updated_at": time.Now(),
|
||
|
|
}
|
||
|
|
condition := "user_id = ? AND symbol = ? AND version = ?"
|
||
|
|
return th.tm.UpdateWithOptimisticLock(ctx, nil, updates, condition, userID, symbol, version)
|
||
|
|
}
|
||
|
|
|
||
|
|
// GetGlobalTransactionHelper 获取全局事务辅助器
|
||
|
|
func GetGlobalTransactionHelper() *TransactionHelper {
|
||
|
|
if GlobalTransactionManager == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return NewTransactionHelper(GlobalTransactionManager)
|
||
|
|
}
|