Files
exchange_go/services/binanceservice/transaction_manager_optimized.go
2025-08-27 14:54:03 +08:00

457 lines
14 KiB
Go

package binanceservice
import (
"context"
"fmt"
"strings"
"time"
"github.com/go-admin-team/go-admin-core/logger"
"gorm.io/gorm"
)
// TransactionManager 事务管理器
type TransactionManager struct {
db *gorm.DB
metrics *MetricsCollector
config *OptimizedConfig
}
// TransactionConfig 事务配置
type TransactionConfig struct {
Timeout time.Duration `json:"timeout"`
RetryAttempts int `json:"retry_attempts"`
RetryDelay time.Duration `json:"retry_delay"`
IsolationLevel string `json:"isolation_level"`
ReadOnly bool `json:"read_only"`
}
// TransactionContext 事务上下文
type TransactionContext struct {
Tx *gorm.DB
StartTime time.Time
Config TransactionConfig
Context context.Context
}
// TransactionResult 事务结果
type TransactionResult struct {
Success bool `json:"success"`
Duration time.Duration `json:"duration"`
Error error `json:"error"`
Retries int `json:"retries"`
Timestamp time.Time `json:"timestamp"`
}
// NewTransactionManager 创建事务管理器
func NewTransactionManager(db *gorm.DB, metrics *MetricsCollector, config *OptimizedConfig) *TransactionManager {
return &TransactionManager{
db: db,
metrics: metrics,
config: config,
}
}
// WithTransaction 在事务中执行操作
func (tm *TransactionManager) WithTransaction(ctx context.Context, config TransactionConfig, operation func(*TransactionContext) error) *TransactionResult {
startTime := time.Now()
result := &TransactionResult{
Timestamp: startTime,
}
// 设置默认配置
if config.Timeout == 0 {
config.Timeout = 30 * time.Second // 默认事务超时时间
}
if config.RetryAttempts == 0 {
config.RetryAttempts = tm.config.RetryConfig.MaxRetries
}
if config.RetryDelay == 0 {
config.RetryDelay = tm.config.RetryConfig.RetryDelay
}
// 执行事务(带重试)
for attempt := 1; attempt <= config.RetryAttempts; attempt++ {
err := tm.executeTransaction(ctx, config, operation)
if err == nil {
result.Success = true
result.Duration = time.Since(startTime)
result.Retries = attempt - 1
break
}
result.Error = err
result.Retries = attempt
// 检查是否可重试
if !tm.isRetryableError(err) || attempt == config.RetryAttempts {
break
}
// 等待重试
logger.Warnf("事务执行失败,第 %d 次重试: %v", attempt, err)
time.Sleep(config.RetryDelay * time.Duration(attempt))
}
result.Duration = time.Since(startTime)
// 记录指标
if tm.metrics != nil {
tm.metrics.RecordDbOperation(true, result.Duration, result.Error)
}
return result
}
// executeTransaction 执行单次事务
func (tm *TransactionManager) executeTransaction(ctx context.Context, config TransactionConfig, operation func(*TransactionContext) error) error {
// 设置事务超时
txCtx := ctx
if config.Timeout > 0 {
var cancel context.CancelFunc
txCtx, cancel = context.WithTimeout(ctx, config.Timeout)
defer cancel()
}
// 开始事务
tx := tm.db.WithContext(txCtx).Begin()
if tx.Error != nil {
return fmt.Errorf("开始事务失败: %w", tx.Error)
}
// 设置事务隔离级别
if config.IsolationLevel != "" {
if err := tx.Exec(fmt.Sprintf("SET TRANSACTION ISOLATION LEVEL %s", config.IsolationLevel)).Error; err != nil {
tx.Rollback()
return fmt.Errorf("设置事务隔离级别失败: %w", err)
}
}
// 设置只读模式
if config.ReadOnly {
if err := tx.Exec("SET TRANSACTION READ ONLY").Error; err != nil {
tx.Rollback()
return fmt.Errorf("设置只读事务失败: %w", err)
}
}
// 创建事务上下文
txContext := &TransactionContext{
Tx: tx,
StartTime: time.Now(),
Config: config,
Context: txCtx,
}
// 执行操作
err := operation(txContext)
if err != nil {
// 回滚事务
if rollbackErr := tx.Rollback().Error; rollbackErr != nil {
logger.Errorf("事务回滚失败: %v", rollbackErr)
return fmt.Errorf("操作失败且回滚失败: %w, 回滚错误: %v", err, rollbackErr)
}
return fmt.Errorf("事务操作失败: %w", err)
}
// 提交事务
if commitErr := tx.Commit().Error; commitErr != nil {
return fmt.Errorf("事务提交失败: %w", commitErr)
}
return nil
}
// WithOptimisticLock 使用乐观锁执行操作
func (tm *TransactionManager) WithOptimisticLock(ctx context.Context, config TransactionConfig, operation func(*TransactionContext) error) *TransactionResult {
return tm.WithTransaction(ctx, config, func(txCtx *TransactionContext) error {
// 在乐观锁模式下,我们需要在操作中检查版本号
return operation(txCtx)
})
}
// SaveWithOptimisticLock 使用乐观锁保存数据
func (tm *TransactionManager) SaveWithOptimisticLock(ctx context.Context, model interface{}, condition string, args ...interface{}) error {
config := TransactionConfig{
Timeout: 30 * time.Second,
RetryAttempts: 3,
RetryDelay: 100 * time.Millisecond,
}
result := tm.WithTransaction(ctx, config, func(txCtx *TransactionContext) error {
// 使用 WHERE 条件和版本号进行更新
result := txCtx.Tx.Where(condition, args...).Save(model)
if result.Error != nil {
return result.Error
}
// 检查是否有行被更新
if result.RowsAffected == 0 {
return fmt.Errorf("乐观锁冲突: 数据已被其他进程修改")
}
return nil
})
return result.Error
}
// UpdateWithOptimisticLock 使用乐观锁更新数据
func (tm *TransactionManager) UpdateWithOptimisticLock(ctx context.Context, model interface{}, updates map[string]interface{}, condition string, args ...interface{}) error {
config := TransactionConfig{
Timeout: 30 * time.Second,
RetryAttempts: 3,
RetryDelay: 100 * time.Millisecond,
}
result := tm.WithTransaction(ctx, config, func(txCtx *TransactionContext) error {
// 增加版本号
if _, hasVersion := updates["version"]; !hasVersion {
updates["version"] = gorm.Expr("version + 1")
}
// 执行更新
result := txCtx.Tx.Model(model).Where(condition, args...).Updates(updates)
if result.Error != nil {
return result.Error
}
// 检查是否有行被更新
if result.RowsAffected == 0 {
return fmt.Errorf("乐观锁冲突: 数据已被其他进程修改")
}
return nil
})
return result.Error
}
// BatchOperation 批量操作
func (tm *TransactionManager) BatchOperation(ctx context.Context, config TransactionConfig, operations []func(*TransactionContext) error) *TransactionResult {
return tm.WithTransaction(ctx, config, func(txCtx *TransactionContext) error {
for i, operation := range operations {
if err := operation(txCtx); err != nil {
return fmt.Errorf("批量操作第 %d 步失败: %w", i+1, err)
}
}
return nil
})
}
// CreateSavepoint 创建保存点
func (tm *TransactionManager) CreateSavepoint(txCtx *TransactionContext, name string) error {
return txCtx.Tx.Exec(fmt.Sprintf("SAVEPOINT %s", name)).Error
}
// RollbackToSavepoint 回滚到保存点
func (tm *TransactionManager) RollbackToSavepoint(txCtx *TransactionContext, name string) error {
return txCtx.Tx.Exec(fmt.Sprintf("ROLLBACK TO SAVEPOINT %s", name)).Error
}
// ReleaseSavepoint 释放保存点
func (tm *TransactionManager) ReleaseSavepoint(txCtx *TransactionContext, name string) error {
return txCtx.Tx.Exec(fmt.Sprintf("RELEASE SAVEPOINT %s", name)).Error
}
// isRetryableError 判断错误是否可重试
func (tm *TransactionManager) isRetryableError(err error) bool {
if err == nil {
return false
}
errorMsg := err.Error()
// 可重试的错误类型
retryableErrors := []string{
"deadlock",
"lock wait timeout",
"connection",
"timeout",
"temporary",
"乐观锁冲突",
}
for _, retryableError := range retryableErrors {
if strings.Contains(errorMsg, retryableError) {
return true
}
}
return false
}
// GetTransactionStats 获取事务统计信息
func (tm *TransactionManager) GetTransactionStats() map[string]interface{} {
stats := make(map[string]interface{})
// 从指标收集器获取数据库相关统计
if tm.metrics != nil {
stats["total_transactions"] = tm.metrics.PerformanceMetrics.DbTransactions
stats["total_queries"] = tm.metrics.PerformanceMetrics.DbQueries
stats["total_errors"] = tm.metrics.PerformanceMetrics.DbErrors
stats["avg_query_time"] = tm.metrics.PerformanceMetrics.DbQueryTime
}
return stats
}
// 预定义的事务配置
// GetDefaultTransactionConfig 获取默认事务配置
func GetDefaultTransactionConfig() TransactionConfig {
return TransactionConfig{
Timeout: 30 * time.Second,
RetryAttempts: 3,
RetryDelay: 100 * time.Millisecond,
IsolationLevel: "READ COMMITTED",
ReadOnly: false,
}
}
// GetReadOnlyTransactionConfig 获取只读事务配置
func GetReadOnlyTransactionConfig() TransactionConfig {
return TransactionConfig{
Timeout: 10 * time.Second,
RetryAttempts: 1,
RetryDelay: 0,
IsolationLevel: "READ COMMITTED",
ReadOnly: true,
}
}
// GetLongRunningTransactionConfig 获取长时间运行事务配置
func GetLongRunningTransactionConfig() TransactionConfig {
return TransactionConfig{
Timeout: 300 * time.Second, // 5分钟
RetryAttempts: 5,
RetryDelay: 500 * time.Millisecond,
IsolationLevel: "REPEATABLE READ",
ReadOnly: false,
}
}
// GetCriticalTransactionConfig 获取关键事务配置
func GetCriticalTransactionConfig() TransactionConfig {
return TransactionConfig{
Timeout: 60 * time.Second,
RetryAttempts: 5,
RetryDelay: 200 * time.Millisecond,
IsolationLevel: "SERIALIZABLE",
ReadOnly: false,
}
}
// 全局事务管理器实例
var GlobalTransactionManager *TransactionManager
// InitTransactionManager 初始化事务管理器
func InitTransactionManager(db *gorm.DB, metrics *MetricsCollector, config *OptimizedConfig) {
GlobalTransactionManager = NewTransactionManager(db, metrics, config)
}
// GetTransactionManager 获取全局事务管理器
func GetTransactionManager() *TransactionManager {
return GlobalTransactionManager
}
// WithDefaultTransaction 使用默认配置执行事务
func WithDefaultTransaction(ctx context.Context, operation func(*TransactionContext) error) error {
if GlobalTransactionManager == nil {
return fmt.Errorf("事务管理器未初始化")
}
config := GetDefaultTransactionConfig()
result := GlobalTransactionManager.WithTransaction(ctx, config, operation)
return result.Error
}
// WithReadOnlyTransaction 使用只读配置执行事务
func WithReadOnlyTransaction(ctx context.Context, operation func(*TransactionContext) error) error {
if GlobalTransactionManager == nil {
return fmt.Errorf("事务管理器未初始化")
}
config := GetReadOnlyTransactionConfig()
result := GlobalTransactionManager.WithTransaction(ctx, config, operation)
return result.Error
}
// WithCriticalTransaction 使用关键事务配置执行事务
func WithCriticalTransaction(ctx context.Context, operation func(*TransactionContext) error) error {
if GlobalTransactionManager == nil {
return fmt.Errorf("事务管理器未初始化")
}
config := GetCriticalTransactionConfig()
result := GlobalTransactionManager.WithTransaction(ctx, config, operation)
return result.Error
}
// SaveWithOptimisticLockGlobal 全局乐观锁保存
func SaveWithOptimisticLockGlobal(ctx context.Context, model interface{}, condition string, args ...interface{}) error {
if GlobalTransactionManager == nil {
return fmt.Errorf("事务管理器未初始化")
}
return GlobalTransactionManager.SaveWithOptimisticLock(ctx, model, condition, args...)
}
// UpdateWithOptimisticLockGlobal 全局乐观锁更新
func UpdateWithOptimisticLockGlobal(ctx context.Context, model interface{}, updates map[string]interface{}, condition string, args ...interface{}) error {
if GlobalTransactionManager == nil {
return fmt.Errorf("事务管理器未初始化")
}
return GlobalTransactionManager.UpdateWithOptimisticLock(ctx, model, updates, condition, args...)
}
// TransactionHelper 事务辅助函数集合
type TransactionHelper struct {
tm *TransactionManager
}
// NewTransactionHelper 创建事务辅助器
func NewTransactionHelper(tm *TransactionManager) *TransactionHelper {
return &TransactionHelper{tm: tm}
}
// SavePosition 保存持仓(带乐观锁)
func (th *TransactionHelper) SavePosition(ctx context.Context, position interface{}, userID, symbol string, version int) error {
condition := "user_id = ? AND symbol = ? AND version = ?"
return th.tm.SaveWithOptimisticLock(ctx, position, condition, userID, symbol, version)
}
// SaveOrder 保存订单(带乐观锁)
func (th *TransactionHelper) SaveOrder(ctx context.Context, order interface{}, orderID string, version int) error {
condition := "order_id = ? AND version = ?"
return th.tm.SaveWithOptimisticLock(ctx, order, condition, orderID, version)
}
// UpdateOrderStatus 更新订单状态(带乐观锁)
func (th *TransactionHelper) UpdateOrderStatus(ctx context.Context, orderID, newStatus string, version int) error {
updates := map[string]interface{}{
"status": newStatus,
"updated_at": time.Now(),
}
condition := "order_id = ? AND version = ?"
return th.tm.UpdateWithOptimisticLock(ctx, nil, updates, condition, orderID, version)
}
// UpdatePositionQuantity 更新持仓数量(带乐观锁)
func (th *TransactionHelper) UpdatePositionQuantity(ctx context.Context, userID, symbol string, quantity float64, version int) error {
updates := map[string]interface{}{
"quantity": quantity,
"updated_at": time.Now(),
}
condition := "user_id = ? AND symbol = ? AND version = ?"
return th.tm.UpdateWithOptimisticLock(ctx, nil, updates, condition, userID, symbol, version)
}
// GetGlobalTransactionHelper 获取全局事务辅助器
func GetGlobalTransactionHelper() *TransactionHelper {
if GlobalTransactionManager == nil {
return nil
}
return NewTransactionHelper(GlobalTransactionManager)
}