676 lines
15 KiB
Go
676 lines
15 KiB
Go
package binanceservice
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"net"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/go-admin-team/go-admin-core/logger"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
// ErrorType 错误类型
|
||
type ErrorType string
|
||
|
||
const (
|
||
ErrorTypeNetwork ErrorType = "network"
|
||
ErrorTypeDatabase ErrorType = "database"
|
||
ErrorTypeBusiness ErrorType = "business"
|
||
ErrorTypeSystem ErrorType = "system"
|
||
ErrorTypeValidation ErrorType = "validation"
|
||
ErrorTypeTimeout ErrorType = "timeout"
|
||
ErrorTypeRateLimit ErrorType = "rate_limit"
|
||
ErrorTypeAuth ErrorType = "auth"
|
||
)
|
||
|
||
// ErrorSeverity 错误严重程度
|
||
type ErrorSeverity string
|
||
|
||
const (
|
||
SeverityLow ErrorSeverity = "low"
|
||
SeverityMedium ErrorSeverity = "medium"
|
||
SeverityHigh ErrorSeverity = "high"
|
||
SeverityCritical ErrorSeverity = "critical"
|
||
)
|
||
|
||
// ErrorAction 错误处理动作
|
||
type ErrorAction string
|
||
|
||
const (
|
||
ActionRetry ErrorAction = "retry"
|
||
ActionFallback ErrorAction = "fallback"
|
||
ActionAlert ErrorAction = "alert"
|
||
ActionIgnore ErrorAction = "ignore"
|
||
ActionCircuit ErrorAction = "circuit"
|
||
ActionDegrade ErrorAction = "degrade"
|
||
)
|
||
|
||
// ErrorInfo 错误信息
|
||
type ErrorInfo struct {
|
||
Type ErrorType `json:"type"`
|
||
Severity ErrorSeverity `json:"severity"`
|
||
Message string `json:"message"`
|
||
OriginalErr error `json:"-"`
|
||
Context string `json:"context"`
|
||
Timestamp time.Time `json:"timestamp"`
|
||
Retryable bool `json:"retryable"`
|
||
Action ErrorAction `json:"action"`
|
||
}
|
||
|
||
// RetryConfig 和 CircuitBreakerConfig 已在 config_optimized.go 中定义
|
||
|
||
// ErrorHandler 错误处理器
|
||
type ErrorHandler struct {
|
||
config *OptimizedConfig
|
||
metricsCollector *MetricsCollector
|
||
circuitBreakers map[string]*CircuitBreaker
|
||
}
|
||
|
||
// CircuitBreaker 熔断器
|
||
type CircuitBreaker struct {
|
||
name string
|
||
config CircuitBreakerConfig
|
||
state CircuitState
|
||
failureCount int
|
||
successCount int
|
||
lastFailTime time.Time
|
||
nextRetry time.Time
|
||
halfOpenCalls int
|
||
}
|
||
|
||
// CircuitState 熔断器状态
|
||
type CircuitState string
|
||
|
||
const (
|
||
StateClosed CircuitState = "closed"
|
||
StateOpen CircuitState = "open"
|
||
StateHalfOpen CircuitState = "half_open"
|
||
)
|
||
|
||
// NewErrorHandler 创建错误处理器
|
||
func NewErrorHandler(config *OptimizedConfig, metricsCollector *MetricsCollector) *ErrorHandler {
|
||
return &ErrorHandler{
|
||
config: config,
|
||
metricsCollector: metricsCollector,
|
||
circuitBreakers: make(map[string]*CircuitBreaker),
|
||
}
|
||
}
|
||
|
||
// ClassifyError 分类错误
|
||
func (eh *ErrorHandler) ClassifyError(err error, context string) *ErrorInfo {
|
||
if err == nil {
|
||
return nil
|
||
}
|
||
|
||
errorInfo := &ErrorInfo{
|
||
OriginalErr: err,
|
||
Message: err.Error(),
|
||
Context: context,
|
||
Timestamp: time.Now(),
|
||
}
|
||
|
||
// 根据错误类型分类
|
||
switch {
|
||
case isNetworkError(err):
|
||
errorInfo.Type = ErrorTypeNetwork
|
||
errorInfo.Severity = SeverityMedium
|
||
errorInfo.Retryable = true
|
||
errorInfo.Action = ActionRetry
|
||
|
||
case isDatabaseError(err):
|
||
errorInfo.Type = ErrorTypeDatabase
|
||
errorInfo.Severity = SeverityHigh
|
||
errorInfo.Retryable = isRetryableDatabaseError(err)
|
||
errorInfo.Action = ActionRetry
|
||
|
||
case isTimeoutError(err):
|
||
errorInfo.Type = ErrorTypeTimeout
|
||
errorInfo.Severity = SeverityMedium
|
||
errorInfo.Retryable = true
|
||
errorInfo.Action = ActionRetry
|
||
|
||
case isRateLimitError(err):
|
||
errorInfo.Type = ErrorTypeRateLimit
|
||
errorInfo.Severity = SeverityMedium
|
||
errorInfo.Retryable = true
|
||
errorInfo.Action = ActionRetry
|
||
|
||
case isAuthError(err):
|
||
errorInfo.Type = ErrorTypeAuth
|
||
errorInfo.Severity = SeverityHigh
|
||
errorInfo.Retryable = false
|
||
errorInfo.Action = ActionAlert
|
||
|
||
case isValidationError(err):
|
||
errorInfo.Type = ErrorTypeValidation
|
||
errorInfo.Severity = SeverityLow
|
||
errorInfo.Retryable = false
|
||
errorInfo.Action = ActionIgnore
|
||
|
||
case isBusinessError(err):
|
||
errorInfo.Type = ErrorTypeBusiness
|
||
errorInfo.Severity = SeverityMedium
|
||
errorInfo.Retryable = false
|
||
errorInfo.Action = ActionFallback
|
||
|
||
default:
|
||
errorInfo.Type = ErrorTypeSystem
|
||
errorInfo.Severity = SeverityHigh
|
||
errorInfo.Retryable = true
|
||
errorInfo.Action = ActionRetry
|
||
}
|
||
|
||
return errorInfo
|
||
}
|
||
|
||
// HandleError 处理错误
|
||
func (eh *ErrorHandler) HandleError(err error, context string) *ErrorInfo {
|
||
errorInfo := eh.ClassifyError(err, context)
|
||
if errorInfo == nil {
|
||
return nil
|
||
}
|
||
|
||
// 记录错误指标
|
||
if eh.metricsCollector != nil {
|
||
eh.metricsCollector.RecordError(string(errorInfo.Type), errorInfo.Message, context)
|
||
}
|
||
|
||
// 根据错误动作处理
|
||
switch errorInfo.Action {
|
||
case ActionAlert:
|
||
eh.sendAlert(errorInfo)
|
||
case ActionCircuit:
|
||
eh.triggerCircuitBreaker(context)
|
||
case ActionDegrade:
|
||
eh.enableDegradedMode(context)
|
||
}
|
||
|
||
// 记录日志
|
||
eh.logError(errorInfo)
|
||
|
||
return errorInfo
|
||
}
|
||
|
||
// RetryWithBackoff 带退避的重试
|
||
func (eh *ErrorHandler) RetryWithBackoff(ctx context.Context, operation func() error, config RetryConfig, context string) error {
|
||
var lastErr error
|
||
delay := config.RetryDelay
|
||
|
||
for attempt := 1; attempt <= config.MaxRetries; attempt++ {
|
||
// 检查上下文是否已取消
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
default:
|
||
}
|
||
|
||
// 检查熔断器状态
|
||
if !eh.canExecute(context) {
|
||
return fmt.Errorf("circuit breaker is open for %s", context)
|
||
}
|
||
|
||
// 执行操作
|
||
// 注意:config中没有TimeoutPerTry字段,这里暂时注释掉
|
||
// if config.TimeoutPerTry > 0 {
|
||
// var cancel context.CancelFunc
|
||
// operationCtx, cancel = context.WithTimeout(ctx, config.TimeoutPerTry)
|
||
// defer cancel()
|
||
// }
|
||
|
||
err := operation()
|
||
if err == nil {
|
||
// 成功,记录成功指标
|
||
if eh.metricsCollector != nil {
|
||
eh.metricsCollector.RecordRetry(true)
|
||
}
|
||
eh.recordSuccess(context)
|
||
return nil
|
||
}
|
||
|
||
lastErr = err
|
||
errorInfo := eh.ClassifyError(err, context)
|
||
|
||
// 记录失败
|
||
eh.recordFailure(context)
|
||
|
||
// 如果不可重试,直接返回
|
||
if errorInfo != nil && !errorInfo.Retryable {
|
||
logger.Warnf("不可重试的错误 [%s]: %v", context, err)
|
||
return err
|
||
}
|
||
|
||
// 最后一次尝试,不再等待
|
||
if attempt == config.MaxRetries {
|
||
break
|
||
}
|
||
|
||
// 计算下次重试延迟
|
||
nextDelay := eh.calculateDelay(delay, config)
|
||
logger.Infof("重试 %d/%d [%s] 在 %v 后,错误: %v", attempt, config.MaxRetries, context, nextDelay, err)
|
||
|
||
// 等待重试
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-time.After(nextDelay):
|
||
delay = nextDelay
|
||
}
|
||
}
|
||
|
||
// 记录重试失败指标
|
||
if eh.metricsCollector != nil {
|
||
eh.metricsCollector.RecordRetry(false)
|
||
}
|
||
|
||
return fmt.Errorf("重试 %d 次后仍然失败 [%s]: %w", config.MaxRetries, context, lastErr)
|
||
}
|
||
|
||
// calculateDelay 计算重试延迟
|
||
func (eh *ErrorHandler) calculateDelay(currentDelay time.Duration, config RetryConfig) time.Duration {
|
||
nextDelay := time.Duration(float64(currentDelay) * config.BackoffFactor)
|
||
|
||
// 限制最大延迟
|
||
if nextDelay > config.MaxRetryDelay {
|
||
nextDelay = config.MaxRetryDelay
|
||
}
|
||
|
||
// 添加10%抖动(简化版本,不依赖JitterEnabled字段)
|
||
jitter := time.Duration(float64(nextDelay) * 0.1)
|
||
if jitter > 0 {
|
||
nextDelay += time.Duration(time.Now().UnixNano() % int64(jitter))
|
||
}
|
||
|
||
return nextDelay
|
||
}
|
||
|
||
// canExecute 检查是否可以执行操作(熔断器检查)
|
||
func (eh *ErrorHandler) canExecute(context string) bool {
|
||
cb := eh.getCircuitBreaker(context)
|
||
if cb == nil || !cb.config.Enabled {
|
||
return true
|
||
}
|
||
|
||
now := time.Now()
|
||
|
||
switch cb.state {
|
||
case StateClosed:
|
||
return true
|
||
|
||
case StateOpen:
|
||
if now.After(cb.nextRetry) {
|
||
cb.state = StateHalfOpen
|
||
cb.halfOpenCalls = 0
|
||
return true
|
||
}
|
||
return false
|
||
|
||
case StateHalfOpen:
|
||
return cb.halfOpenCalls < cb.config.HalfOpenMaxCalls
|
||
|
||
default:
|
||
return true
|
||
}
|
||
}
|
||
|
||
// recordSuccess 记录成功
|
||
func (eh *ErrorHandler) recordSuccess(context string) {
|
||
cb := eh.getCircuitBreaker(context)
|
||
if cb == nil || !cb.config.Enabled {
|
||
return
|
||
}
|
||
|
||
switch cb.state {
|
||
case StateHalfOpen:
|
||
cb.successCount++
|
||
if cb.successCount >= cb.config.SuccessThreshold {
|
||
cb.state = StateClosed
|
||
cb.failureCount = 0
|
||
cb.successCount = 0
|
||
}
|
||
|
||
case StateClosed:
|
||
cb.failureCount = 0
|
||
}
|
||
}
|
||
|
||
// recordFailure 记录失败
|
||
func (eh *ErrorHandler) recordFailure(context string) {
|
||
cb := eh.getCircuitBreaker(context)
|
||
if cb == nil || !cb.config.Enabled {
|
||
return
|
||
}
|
||
|
||
cb.failureCount++
|
||
cb.lastFailTime = time.Now()
|
||
|
||
switch cb.state {
|
||
case StateClosed:
|
||
if cb.failureCount >= cb.config.FailureThreshold {
|
||
cb.state = StateOpen
|
||
cb.nextRetry = time.Now().Add(cb.config.Timeout)
|
||
logger.Warnf("熔断器开启 [%s]: 失败次数 %d", context, cb.failureCount)
|
||
}
|
||
|
||
case StateHalfOpen:
|
||
cb.state = StateOpen
|
||
cb.nextRetry = time.Now().Add(cb.config.Timeout)
|
||
cb.successCount = 0
|
||
logger.Warnf("熔断器重新开启 [%s]", context)
|
||
}
|
||
}
|
||
|
||
// getCircuitBreaker 获取熔断器
|
||
func (eh *ErrorHandler) getCircuitBreaker(context string) *CircuitBreaker {
|
||
cb, exists := eh.circuitBreakers[context]
|
||
if !exists {
|
||
cb = &CircuitBreaker{
|
||
name: context,
|
||
config: eh.config.CircuitBreakerConfig,
|
||
state: StateClosed,
|
||
}
|
||
eh.circuitBreakers[context] = cb
|
||
}
|
||
return cb
|
||
}
|
||
|
||
// triggerCircuitBreaker 触发熔断器
|
||
func (eh *ErrorHandler) triggerCircuitBreaker(context string) {
|
||
cb := eh.getCircuitBreaker(context)
|
||
if cb != nil && cb.config.Enabled {
|
||
cb.state = StateOpen
|
||
cb.nextRetry = time.Now().Add(cb.config.Timeout)
|
||
logger.Warnf("手动触发熔断器 [%s]", context)
|
||
}
|
||
}
|
||
|
||
// sendAlert 发送告警
|
||
func (eh *ErrorHandler) sendAlert(errorInfo *ErrorInfo) {
|
||
// 这里可以集成告警系统,如钉钉、邮件等
|
||
logger.Errorf("告警: [%s] %s - %s", errorInfo.Type, errorInfo.Context, errorInfo.Message)
|
||
}
|
||
|
||
// enableDegradedMode 启用降级模式
|
||
func (eh *ErrorHandler) enableDegradedMode(context string) {
|
||
logger.Warnf("启用降级模式 [%s]", context)
|
||
// 这里可以实现具体的降级逻辑
|
||
}
|
||
|
||
// logError 记录错误日志
|
||
func (eh *ErrorHandler) logError(errorInfo *ErrorInfo) {
|
||
switch errorInfo.Severity {
|
||
case SeverityLow:
|
||
logger.Infof("[%s] %s: %s", errorInfo.Type, errorInfo.Context, errorInfo.Message)
|
||
case SeverityMedium:
|
||
logger.Warnf("[%s] %s: %s", errorInfo.Type, errorInfo.Context, errorInfo.Message)
|
||
case SeverityHigh, SeverityCritical:
|
||
logger.Errorf("[%s] %s: %s", errorInfo.Type, errorInfo.Context, errorInfo.Message)
|
||
}
|
||
}
|
||
|
||
// GetCircuitBreakerStatus 获取熔断器状态
|
||
func (eh *ErrorHandler) GetCircuitBreakerStatus() map[string]interface{} {
|
||
status := make(map[string]interface{})
|
||
for name, cb := range eh.circuitBreakers {
|
||
status[name] = map[string]interface{}{
|
||
"state": cb.state,
|
||
"failure_count": cb.failureCount,
|
||
"success_count": cb.successCount,
|
||
"last_fail_time": cb.lastFailTime,
|
||
"next_retry": cb.nextRetry,
|
||
}
|
||
}
|
||
return status
|
||
}
|
||
|
||
// 错误分类辅助函数
|
||
|
||
// isNetworkError 判断是否为网络错误
|
||
func isNetworkError(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
|
||
// 检查网络相关错误
|
||
var netErr net.Error
|
||
if errors.As(err, &netErr) {
|
||
return true
|
||
}
|
||
|
||
// 检查常见网络错误消息
|
||
errorMsg := strings.ToLower(err.Error())
|
||
networkKeywords := []string{
|
||
"connection refused",
|
||
"connection reset",
|
||
"connection timeout",
|
||
"network unreachable",
|
||
"no route to host",
|
||
"dns",
|
||
"socket",
|
||
}
|
||
|
||
for _, keyword := range networkKeywords {
|
||
if strings.Contains(errorMsg, keyword) {
|
||
return true
|
||
}
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
// isDatabaseError 判断是否为数据库错误
|
||
func isDatabaseError(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
|
||
// 检查GORM错误
|
||
if errors.Is(err, gorm.ErrRecordNotFound) ||
|
||
errors.Is(err, gorm.ErrInvalidTransaction) ||
|
||
errors.Is(err, gorm.ErrNotImplemented) {
|
||
return true
|
||
}
|
||
|
||
// 检查数据库相关错误消息
|
||
errorMsg := strings.ToLower(err.Error())
|
||
dbKeywords := []string{
|
||
"database",
|
||
"sql",
|
||
"mysql",
|
||
"postgres",
|
||
"connection pool",
|
||
"deadlock",
|
||
"constraint",
|
||
"duplicate key",
|
||
}
|
||
|
||
for _, keyword := range dbKeywords {
|
||
if strings.Contains(errorMsg, keyword) {
|
||
return true
|
||
}
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
// isRetryableDatabaseError 判断是否为可重试的数据库错误
|
||
func isRetryableDatabaseError(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
|
||
// 不可重试的错误
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return false
|
||
}
|
||
|
||
errorMsg := strings.ToLower(err.Error())
|
||
nonRetryableKeywords := []string{
|
||
"constraint",
|
||
"duplicate key",
|
||
"foreign key",
|
||
"syntax error",
|
||
}
|
||
|
||
for _, keyword := range nonRetryableKeywords {
|
||
if strings.Contains(errorMsg, keyword) {
|
||
return false
|
||
}
|
||
}
|
||
|
||
return true
|
||
}
|
||
|
||
// isTimeoutError 判断是否为超时错误
|
||
func isTimeoutError(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
|
||
// 检查超时错误
|
||
var netErr net.Error
|
||
if errors.As(err, &netErr) && netErr.Timeout() {
|
||
return true
|
||
}
|
||
|
||
if errors.Is(err, context.DeadlineExceeded) {
|
||
return true
|
||
}
|
||
|
||
errorMsg := strings.ToLower(err.Error())
|
||
return strings.Contains(errorMsg, "timeout") ||
|
||
strings.Contains(errorMsg, "deadline exceeded")
|
||
}
|
||
|
||
// isRateLimitError 判断是否为限流错误
|
||
func isRateLimitError(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
|
||
errorMsg := strings.ToLower(err.Error())
|
||
rateLimitKeywords := []string{
|
||
"rate limit",
|
||
"too many requests",
|
||
"429",
|
||
"quota exceeded",
|
||
"throttle",
|
||
}
|
||
|
||
for _, keyword := range rateLimitKeywords {
|
||
if strings.Contains(errorMsg, keyword) {
|
||
return true
|
||
}
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
// isAuthError 判断是否为认证错误
|
||
func isAuthError(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
|
||
errorMsg := strings.ToLower(err.Error())
|
||
authKeywords := []string{
|
||
"unauthorized",
|
||
"authentication",
|
||
"invalid signature",
|
||
"api key",
|
||
"401",
|
||
"403",
|
||
"forbidden",
|
||
}
|
||
|
||
for _, keyword := range authKeywords {
|
||
if strings.Contains(errorMsg, keyword) {
|
||
return true
|
||
}
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
// isValidationError 判断是否为验证错误
|
||
func isValidationError(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
|
||
errorMsg := strings.ToLower(err.Error())
|
||
validationKeywords := []string{
|
||
"validation",
|
||
"invalid parameter",
|
||
"bad request",
|
||
"400",
|
||
"missing required",
|
||
"invalid format",
|
||
}
|
||
|
||
for _, keyword := range validationKeywords {
|
||
if strings.Contains(errorMsg, keyword) {
|
||
return true
|
||
}
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
// isBusinessError 判断是否为业务错误
|
||
func isBusinessError(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
|
||
errorMsg := strings.ToLower(err.Error())
|
||
businessKeywords := []string{
|
||
"insufficient balance",
|
||
"order not found",
|
||
"position not found",
|
||
"invalid order status",
|
||
"market closed",
|
||
"symbol not found",
|
||
}
|
||
|
||
for _, keyword := range businessKeywords {
|
||
if strings.Contains(errorMsg, keyword) {
|
||
return true
|
||
}
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
// 全局错误处理器实例
|
||
var GlobalErrorHandler *ErrorHandler
|
||
|
||
// InitErrorHandler 初始化错误处理器
|
||
func InitErrorHandler(config *OptimizedConfig, metricsCollector *MetricsCollector) {
|
||
GlobalErrorHandler = NewErrorHandler(config, metricsCollector)
|
||
}
|
||
|
||
// GetErrorHandler 获取全局错误处理器
|
||
func GetErrorHandler() *ErrorHandler {
|
||
return GlobalErrorHandler
|
||
}
|
||
|
||
// HandleErrorWithRetry 处理错误并重试的便捷函数
|
||
func HandleErrorWithRetry(ctx context.Context, operation func() error, context string) error {
|
||
if GlobalErrorHandler == nil {
|
||
return operation()
|
||
}
|
||
|
||
config := RetryConfig{
|
||
MaxRetries: 3,
|
||
RetryDelay: 100 * time.Millisecond,
|
||
MaxRetryDelay: 5 * time.Second,
|
||
BackoffFactor: 2.0,
|
||
ApiRetryCount: 3,
|
||
DbRetryCount: 3,
|
||
}
|
||
|
||
return GlobalErrorHandler.RetryWithBackoff(ctx, operation, config, context)
|
||
} |