package binanceservice import ( "context" "fmt" "runtime" "sync" "sync/atomic" "time" "github.com/go-admin-team/go-admin-core/logger" "github.com/shopspring/decimal" ) // MetricsCollector 指标收集器 type MetricsCollector struct { mu sync.RWMutex // 业务指标 OrderMetrics *OrderMetrics PositionMetrics *PositionMetrics TakeProfitMetrics *TakeProfitMetrics // 技术指标 PerformanceMetrics *PerformanceMetrics ErrorMetrics *ErrorMetrics // 系统指标 SystemMetrics *SystemMetrics // 配置 config *OptimizedConfig // 控制 ctx context.Context cancel context.CancelFunc startTime time.Time } // OrderMetrics 订单相关指标 type OrderMetrics struct { // 订单处理统计 TotalProcessed int64 `json:"total_processed"` SuccessfulOrders int64 `json:"successful_orders"` FailedOrders int64 `json:"failed_orders"` CanceledOrders int64 `json:"canceled_orders"` // 反向订单统计 ReverseOrdersCreated int64 `json:"reverse_orders_created"` ReverseOrdersFailed int64 `json:"reverse_orders_failed"` // 响应时间统计 TotalResponseTime time.Duration `json:"total_response_time"` MaxResponseTime time.Duration `json:"max_response_time"` MinResponseTime time.Duration `json:"min_response_time"` // 最近更新时间 LastUpdated time.Time `json:"last_updated"` } // PositionMetrics 持仓相关指标 type PositionMetrics struct { // 持仓同步统计 SyncAttempts int64 `json:"sync_attempts"` SyncSuccesses int64 `json:"sync_successes"` SyncFailures int64 `json:"sync_failures"` // 持仓差异统计 PositionMismatches int64 `json:"position_mismatches"` MaxDifference decimal.Decimal `json:"max_difference"` TotalDifference decimal.Decimal `json:"total_difference"` // 平仓统计 ClosePositions int64 `json:"close_positions"` ForceClosePositions int64 `json:"force_close_positions"` LastUpdated time.Time `json:"last_updated"` } // TakeProfitMetrics 止盈止损相关指标 type TakeProfitMetrics struct { // 止盈止损创建统计 TakeProfitCreated int64 `json:"take_profit_created"` StopLossCreated int64 `json:"stop_loss_created"` // 止盈止损执行统计 TakeProfitExecuted int64 `json:"take_profit_executed"` StopLossExecuted int64 `json:"stop_loss_executed"` // 取消统计 OrdersCanceled int64 `json:"orders_canceled"` BatchCancelAttempts int64 `json:"batch_cancel_attempts"` BatchCancelSuccesses int64 `json:"batch_cancel_successes"` LastUpdated time.Time `json:"last_updated"` } // PerformanceMetrics 性能相关指标 type PerformanceMetrics struct { // 并发统计 ConcurrentRequests int64 `json:"concurrent_requests"` MaxConcurrency int64 `json:"max_concurrency"` // 锁统计 LockAcquisitions int64 `json:"lock_acquisitions"` LockWaitTime time.Duration `json:"lock_wait_time"` LockTimeouts int64 `json:"lock_timeouts"` // 数据库操作统计 DbQueries int64 `json:"db_queries"` DbTransactions int64 `json:"db_transactions"` DbQueryTime time.Duration `json:"db_query_time"` DbErrors int64 `json:"db_errors"` // API调用统计 ApiCalls int64 `json:"api_calls"` ApiCallTime time.Duration `json:"api_call_time"` ApiErrors int64 `json:"api_errors"` ApiRetries int64 `json:"api_retries"` LastUpdated time.Time `json:"last_updated"` } // ErrorMetrics 错误相关指标 type ErrorMetrics struct { // 错误分类统计 NetworkErrors int64 `json:"network_errors"` DatabaseErrors int64 `json:"database_errors"` BusinessErrors int64 `json:"business_errors"` SystemErrors int64 `json:"system_errors"` // 重试统计 RetryAttempts int64 `json:"retry_attempts"` RetrySuccesses int64 `json:"retry_successes"` RetryFailures int64 `json:"retry_failures"` // 最近错误 RecentErrors []ErrorRecord `json:"recent_errors"` LastUpdated time.Time `json:"last_updated"` } // SystemMetrics 系统相关指标 type SystemMetrics struct { // 内存使用 MemoryUsage uint64 `json:"memory_usage"` MemoryPercent float64 `json:"memory_percent"` // CPU使用 CpuPercent float64 `json:"cpu_percent"` // Goroutine统计 GoroutineCount int `json:"goroutine_count"` // GC统计 GcCount uint32 `json:"gc_count"` GcPauseMs uint64 `json:"gc_pause_ms"` LastUpdated time.Time `json:"last_updated"` } // ErrorRecord 错误记录 type ErrorRecord struct { Timestamp time.Time `json:"timestamp"` Type string `json:"type"` Message string `json:"message"` Context string `json:"context"` } // NewMetricsCollector 创建新的指标收集器 func NewMetricsCollector(config *OptimizedConfig) *MetricsCollector { ctx, cancel := context.WithCancel(context.Background()) return &MetricsCollector{ OrderMetrics: &OrderMetrics{ MinResponseTime: time.Hour, // 初始化为一个大值 LastUpdated: time.Now(), }, PositionMetrics: &PositionMetrics{ MaxDifference: decimal.Zero, TotalDifference: decimal.Zero, LastUpdated: time.Now(), }, TakeProfitMetrics: &TakeProfitMetrics{ LastUpdated: time.Now(), }, PerformanceMetrics: &PerformanceMetrics{ LastUpdated: time.Now(), }, ErrorMetrics: &ErrorMetrics{ RecentErrors: make([]ErrorRecord, 0, 100), LastUpdated: time.Now(), }, SystemMetrics: &SystemMetrics{ LastUpdated: time.Now(), }, config: config, ctx: ctx, cancel: cancel, startTime: time.Now(), } } // Start 启动指标收集 func (mc *MetricsCollector) Start() { if !mc.config.MonitorConfig.Enabled { return } logger.Info("启动指标收集器") // 启动定期收集 go mc.collectLoop() // 启动告警检查 go mc.alertLoop() } // Stop 停止指标收集 func (mc *MetricsCollector) Stop() { logger.Info("停止指标收集器") mc.cancel() } // collectLoop 收集循环 func (mc *MetricsCollector) collectLoop() { ticker := time.NewTicker(mc.config.MonitorConfig.CollectInterval) defer ticker.Stop() for { select { case <-mc.ctx.Done(): return case <-ticker.C: mc.collectSystemMetrics() } } } // alertLoop 告警检查循环 func (mc *MetricsCollector) alertLoop() { ticker := time.NewTicker(time.Minute) // 每分钟检查一次 defer ticker.Stop() for { select { case <-mc.ctx.Done(): return case <-ticker.C: mc.checkAlerts() } } } // RecordOrderProcessed 记录订单处理 func (mc *MetricsCollector) RecordOrderProcessed(success bool, responseTime time.Duration) { mc.mu.Lock() defer mc.mu.Unlock() atomic.AddInt64(&mc.OrderMetrics.TotalProcessed, 1) if success { atomic.AddInt64(&mc.OrderMetrics.SuccessfulOrders, 1) } else { atomic.AddInt64(&mc.OrderMetrics.FailedOrders, 1) } // 更新响应时间统计 mc.OrderMetrics.TotalResponseTime += responseTime if responseTime > mc.OrderMetrics.MaxResponseTime { mc.OrderMetrics.MaxResponseTime = responseTime } if responseTime < mc.OrderMetrics.MinResponseTime { mc.OrderMetrics.MinResponseTime = responseTime } mc.OrderMetrics.LastUpdated = time.Now() } // RecordOrderCanceled 记录订单取消 func (mc *MetricsCollector) RecordOrderCanceled() { atomic.AddInt64(&mc.OrderMetrics.CanceledOrders, 1) mc.OrderMetrics.LastUpdated = time.Now() } // RecordReverseOrder 记录反向订单 func (mc *MetricsCollector) RecordReverseOrder(success bool) { if success { atomic.AddInt64(&mc.OrderMetrics.ReverseOrdersCreated, 1) } else { atomic.AddInt64(&mc.OrderMetrics.ReverseOrdersFailed, 1) } mc.OrderMetrics.LastUpdated = time.Now() } // RecordPositionSync 记录持仓同步 func (mc *MetricsCollector) RecordPositionSync(success bool, difference decimal.Decimal) { mc.mu.Lock() defer mc.mu.Unlock() atomic.AddInt64(&mc.PositionMetrics.SyncAttempts, 1) if success { atomic.AddInt64(&mc.PositionMetrics.SyncSuccesses, 1) } else { atomic.AddInt64(&mc.PositionMetrics.SyncFailures, 1) } // 更新差异统计 if !difference.IsZero() { atomic.AddInt64(&mc.PositionMetrics.PositionMismatches, 1) mc.PositionMetrics.TotalDifference = mc.PositionMetrics.TotalDifference.Add(difference.Abs()) if difference.Abs().GreaterThan(mc.PositionMetrics.MaxDifference) { mc.PositionMetrics.MaxDifference = difference.Abs() } } mc.PositionMetrics.LastUpdated = time.Now() } // RecordClosePosition 记录平仓 func (mc *MetricsCollector) RecordClosePosition(forced bool) { atomic.AddInt64(&mc.PositionMetrics.ClosePositions, 1) if forced { atomic.AddInt64(&mc.PositionMetrics.ForceClosePositions, 1) } mc.PositionMetrics.LastUpdated = time.Now() } // RecordTakeProfitCreated 记录止盈止损创建 func (mc *MetricsCollector) RecordTakeProfitCreated(orderType string) { switch orderType { case "TAKE_PROFIT", "TAKE_PROFIT_MARKET": atomic.AddInt64(&mc.TakeProfitMetrics.TakeProfitCreated, 1) case "STOP", "STOP_MARKET": atomic.AddInt64(&mc.TakeProfitMetrics.StopLossCreated, 1) } mc.TakeProfitMetrics.LastUpdated = time.Now() } // RecordTakeProfitExecuted 记录止盈止损执行 func (mc *MetricsCollector) RecordTakeProfitExecuted(orderType string) { switch orderType { case "TAKE_PROFIT", "TAKE_PROFIT_MARKET": atomic.AddInt64(&mc.TakeProfitMetrics.TakeProfitExecuted, 1) case "STOP", "STOP_MARKET": atomic.AddInt64(&mc.TakeProfitMetrics.StopLossExecuted, 1) } mc.TakeProfitMetrics.LastUpdated = time.Now() } // RecordOrdersCanceled 记录订单取消 func (mc *MetricsCollector) RecordOrdersCanceled(count int) { atomic.AddInt64(&mc.TakeProfitMetrics.OrdersCanceled, int64(count)) mc.TakeProfitMetrics.LastUpdated = time.Now() } // RecordBatchCancel 记录批量取消 func (mc *MetricsCollector) RecordBatchCancel(success bool) { atomic.AddInt64(&mc.TakeProfitMetrics.BatchCancelAttempts, 1) if success { atomic.AddInt64(&mc.TakeProfitMetrics.BatchCancelSuccesses, 1) } mc.TakeProfitMetrics.LastUpdated = time.Now() } // RecordLockOperation 记录锁操作 func (mc *MetricsCollector) RecordLockOperation(acquired bool, waitTime time.Duration) { atomic.AddInt64(&mc.PerformanceMetrics.LockAcquisitions, 1) mc.PerformanceMetrics.LockWaitTime += waitTime if !acquired { atomic.AddInt64(&mc.PerformanceMetrics.LockTimeouts, 1) } mc.PerformanceMetrics.LastUpdated = time.Now() } // RecordDbOperation 记录数据库操作 func (mc *MetricsCollector) RecordDbOperation(isTransaction bool, duration time.Duration, err error) { if isTransaction { atomic.AddInt64(&mc.PerformanceMetrics.DbTransactions, 1) } else { atomic.AddInt64(&mc.PerformanceMetrics.DbQueries, 1) } mc.PerformanceMetrics.DbQueryTime += duration if err != nil { atomic.AddInt64(&mc.PerformanceMetrics.DbErrors, 1) mc.RecordError("database", err.Error(), "db_operation") } mc.PerformanceMetrics.LastUpdated = time.Now() } // RecordApiCall 记录API调用 func (mc *MetricsCollector) RecordApiCall(duration time.Duration, err error, retryCount int) { atomic.AddInt64(&mc.PerformanceMetrics.ApiCalls, 1) mc.PerformanceMetrics.ApiCallTime += duration if err != nil { atomic.AddInt64(&mc.PerformanceMetrics.ApiErrors, 1) mc.RecordError("api", err.Error(), "api_call") } if retryCount > 0 { atomic.AddInt64(&mc.PerformanceMetrics.ApiRetries, int64(retryCount)) } mc.PerformanceMetrics.LastUpdated = time.Now() } // RecordConcurrency 记录并发数 func (mc *MetricsCollector) RecordConcurrency(current int64) { atomic.StoreInt64(&mc.PerformanceMetrics.ConcurrentRequests, current) if current > atomic.LoadInt64(&mc.PerformanceMetrics.MaxConcurrency) { atomic.StoreInt64(&mc.PerformanceMetrics.MaxConcurrency, current) } mc.PerformanceMetrics.LastUpdated = time.Now() } // RecordError 记录错误 func (mc *MetricsCollector) RecordError(errorType, message, context string) { mc.mu.Lock() defer mc.mu.Unlock() // 分类统计 switch errorType { case "network": atomic.AddInt64(&mc.ErrorMetrics.NetworkErrors, 1) case "database": atomic.AddInt64(&mc.ErrorMetrics.DatabaseErrors, 1) case "business": atomic.AddInt64(&mc.ErrorMetrics.BusinessErrors, 1) default: atomic.AddInt64(&mc.ErrorMetrics.SystemErrors, 1) } // 记录最近错误 errorRecord := ErrorRecord{ Timestamp: time.Now(), Type: errorType, Message: message, Context: context, } // 保持最近100个错误 if len(mc.ErrorMetrics.RecentErrors) >= 100 { mc.ErrorMetrics.RecentErrors = mc.ErrorMetrics.RecentErrors[1:] } mc.ErrorMetrics.RecentErrors = append(mc.ErrorMetrics.RecentErrors, errorRecord) mc.ErrorMetrics.LastUpdated = time.Now() } // RecordRetry 记录重试 func (mc *MetricsCollector) RecordRetry(success bool) { atomic.AddInt64(&mc.ErrorMetrics.RetryAttempts, 1) if success { atomic.AddInt64(&mc.ErrorMetrics.RetrySuccesses, 1) } else { atomic.AddInt64(&mc.ErrorMetrics.RetryFailures, 1) } mc.ErrorMetrics.LastUpdated = time.Now() } // collectSystemMetrics 收集系统指标 func (mc *MetricsCollector) collectSystemMetrics() { mc.mu.Lock() defer mc.mu.Unlock() // 获取内存统计 var m runtime.MemStats runtime.ReadMemStats(&m) mc.SystemMetrics.MemoryUsage = m.Alloc mc.SystemMetrics.GoroutineCount = runtime.NumGoroutine() mc.SystemMetrics.GcCount = m.NumGC mc.SystemMetrics.GcPauseMs = uint64(m.PauseNs[(m.NumGC+255)%256]) / 1000000 mc.SystemMetrics.LastUpdated = time.Now() } // checkAlerts 检查告警 func (mc *MetricsCollector) checkAlerts() { thresholds := mc.config.MonitorConfig.AlertThresholds // 检查订单失败率 if mc.OrderMetrics.TotalProcessed > 0 { failureRate := float64(mc.OrderMetrics.FailedOrders) / float64(mc.OrderMetrics.TotalProcessed) * 100 if failureRate > thresholds.OrderFailureRate { logger.Warnf("订单失败率告警: %.2f%% (阈值: %.2f%%)", failureRate, thresholds.OrderFailureRate) } } // 检查持仓同步失败率 if mc.PositionMetrics.SyncAttempts > 0 { syncFailureRate := float64(mc.PositionMetrics.SyncFailures) / float64(mc.PositionMetrics.SyncAttempts) * 100 if syncFailureRate > thresholds.PositionSyncFailureRate { logger.Warnf("持仓同步失败率告警: %.2f%% (阈值: %.2f%%)", syncFailureRate, thresholds.PositionSyncFailureRate) } } // 检查API失败率 if mc.PerformanceMetrics.ApiCalls > 0 { apiFailureRate := float64(mc.PerformanceMetrics.ApiErrors) / float64(mc.PerformanceMetrics.ApiCalls) * 100 if apiFailureRate > thresholds.ApiFailureRate { logger.Warnf("API失败率告警: %.2f%% (阈值: %.2f%%)", apiFailureRate, thresholds.ApiFailureRate) } } // 检查内存使用率 if mc.SystemMetrics.MemoryPercent > thresholds.MemoryUsageThreshold { logger.Warnf("内存使用率告警: %.2f%% (阈值: %.2f%%)", mc.SystemMetrics.MemoryPercent, thresholds.MemoryUsageThreshold) } // 检查CPU使用率 if mc.SystemMetrics.CpuPercent > thresholds.CpuUsageThreshold { logger.Warnf("CPU使用率告警: %.2f%% (阈值: %.2f%%)", mc.SystemMetrics.CpuPercent, thresholds.CpuUsageThreshold) } } // GetMetrics 获取所有指标 func (mc *MetricsCollector) GetMetrics() map[string]interface{} { mc.mu.RLock() defer mc.mu.RUnlock() return map[string]interface{}{ "order_metrics": mc.OrderMetrics, "position_metrics": mc.PositionMetrics, "take_profit_metrics": mc.TakeProfitMetrics, "performance_metrics": mc.PerformanceMetrics, "error_metrics": mc.ErrorMetrics, "system_metrics": mc.SystemMetrics, "uptime": time.Since(mc.startTime).String(), "collected_at": time.Now(), } } // GetSummary 获取指标摘要 func (mc *MetricsCollector) GetSummary() map[string]interface{} { mc.mu.RLock() defer mc.mu.RUnlock() summary := make(map[string]interface{}) // 订单摘要 if mc.OrderMetrics.TotalProcessed > 0 { successRate := float64(mc.OrderMetrics.SuccessfulOrders) / float64(mc.OrderMetrics.TotalProcessed) * 100 avgResponseTime := mc.OrderMetrics.TotalResponseTime / time.Duration(mc.OrderMetrics.TotalProcessed) summary["order_success_rate"] = fmt.Sprintf("%.2f%%", successRate) summary["avg_response_time"] = avgResponseTime.String() summary["total_orders"] = mc.OrderMetrics.TotalProcessed } // 持仓摘要 if mc.PositionMetrics.SyncAttempts > 0 { syncSuccessRate := float64(mc.PositionMetrics.SyncSuccesses) / float64(mc.PositionMetrics.SyncAttempts) * 100 summary["position_sync_rate"] = fmt.Sprintf("%.2f%%", syncSuccessRate) summary["position_mismatches"] = mc.PositionMetrics.PositionMismatches } // 性能摘要 summary["current_concurrency"] = mc.PerformanceMetrics.ConcurrentRequests summary["max_concurrency"] = mc.PerformanceMetrics.MaxConcurrency summary["total_api_calls"] = mc.PerformanceMetrics.ApiCalls summary["total_db_queries"] = mc.PerformanceMetrics.DbQueries // 系统摘要 summary["memory_usage_mb"] = mc.SystemMetrics.MemoryUsage / 1024 / 1024 summary["goroutine_count"] = mc.SystemMetrics.GoroutineCount summary["uptime"] = time.Since(mc.startTime).String() return summary } // 全局指标收集器实例 var GlobalMetricsCollector *MetricsCollector // InitMetricsCollector 初始化指标收集器 func InitMetricsCollector(config *OptimizedConfig) { GlobalMetricsCollector = NewMetricsCollector(config) GlobalMetricsCollector.Start() } // GetMetricsCollector 获取全局指标收集器 func GetMetricsCollector() *MetricsCollector { return GlobalMetricsCollector }