Files
proxy_server/utils/utility/generic_queue.go

372 lines
8.2 KiB
Go
Raw Normal View History

package utility
import (
"sync"
"time"
)
// GenericQueue 通用循环队列
// 支持存储任意类型的数据,实现负载均衡和容错
type GenericQueue[T any] struct {
data []T // 存储数据的数组
current int // 当前指针位置
size int // 队列中的元素数量
capacity int // 队列容量
mutex sync.RWMutex // 读写锁,保证线程安全
lastUsed map[int]time.Time // 记录每个位置的最后使用时间
cooldown time.Duration // 冷却时间,避免频繁使用同一个元素
comparer func(T, T) bool // 比较函数,用于检查重复元素
}
var (
// QuequeMap 全局队列映射表,用于管理多个命名队列
// 使用interface{}类型以支持不同泛型类型的队列
QuequeMap = make(map[string]interface{})
)
// NewGenericQueue 创建新的通用循环队列
// capacity: 队列容量
// comparer: 比较函数,用于检查重复元素(可选)
// cooldown: 元素使用冷却时间可选默认0表示无冷却
func NewGenericQueue[T any](capacity int, comparer func(T, T) bool, cooldown ...time.Duration) *GenericQueue[T] {
cd := time.Duration(0)
if len(cooldown) > 0 {
cd = cooldown[0]
}
return &GenericQueue[T]{
data: make([]T, capacity),
capacity: capacity,
lastUsed: make(map[int]time.Time),
cooldown: cd,
comparer: comparer,
}
}
// Add 添加元素到队列
// item: 要添加的元素
// 返回: 是否添加成功
func (q *GenericQueue[T]) Add(item T) bool {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.size >= q.capacity {
return false // 队列已满
}
// 如果提供了比较函数,检查是否已存在相同的元素
if q.comparer != nil {
for i := 0; i < q.size; i++ {
if q.comparer(q.data[i], item) {
return false // 已存在
}
}
}
q.data[q.size] = item
q.size++
return true
}
// Remove 从队列中移除指定的元素
// item: 要移除的元素
// 返回: 是否移除成功
func (q *GenericQueue[T]) Remove(item T) bool {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.comparer == nil {
return false // 没有比较函数无法移除
}
for i := 0; i < q.size; i++ {
if q.comparer(q.data[i], item) {
// 将后面的元素前移
for j := i; j < q.size-1; j++ {
q.data[j] = q.data[j+1]
}
q.size--
// 调整current指针
if q.current >= q.size && q.size > 0 {
q.current = 0
}
// 清理lastUsed记录
delete(q.lastUsed, i)
return true
}
}
return false
}
// GetNext 获取下一个可用的元素(轮询方式)
// 返回: 元素和是否获取成功
func (q *GenericQueue[T]) GetNext() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
var zero T
if q.size == 0 {
return zero, false // 队列为空
}
// 如果没有设置冷却时间,直接返回下一个元素
if q.cooldown == 0 {
item := q.data[q.current]
q.lastUsed[q.current] = time.Now()
q.current = (q.current + 1) % q.size
return item, true
}
// 寻找可用的元素(考虑冷却时间)
startPos := q.current
for {
lastUsed, exists := q.lastUsed[q.current]
// 如果元素从未使用过,或者已过冷却时间
if !exists || time.Since(lastUsed) >= q.cooldown {
item := q.data[q.current]
q.lastUsed[q.current] = time.Now()
q.current = (q.current + 1) % q.size
return item, true
}
q.current = (q.current + 1) % q.size
// 如果遍历了一圈都没找到可用的元素
if q.current == startPos {
// 返回当前元素,忽略冷却时间
item := q.data[q.current]
q.lastUsed[q.current] = time.Now()
q.current = (q.current + 1) % q.size
return item, true
}
}
}
// GetRandom 随机获取一个元素
// 返回: 元素和是否获取成功
func (q *GenericQueue[T]) GetRandom() (T, bool) {
q.mutex.RLock()
defer q.mutex.RUnlock()
var zero T
if q.size == 0 {
return zero, false
}
// 使用当前时间作为随机种子
index := int(time.Now().UnixNano()) % q.size
item := q.data[index]
q.lastUsed[index] = time.Now()
return item, true
}
// GetAll 获取所有元素的副本
// 返回: 元素切片
func (q *GenericQueue[T]) GetAll() []T {
q.mutex.RLock()
defer q.mutex.RUnlock()
items := make([]T, q.size)
copy(items, q.data[:q.size])
return items
}
// Size 获取队列中的元素数量
// 返回: 元素数量
func (q *GenericQueue[T]) Size() int {
q.mutex.RLock()
defer q.mutex.RUnlock()
return q.size
}
// IsEmpty 检查队列是否为空
// 返回: 是否为空
func (q *GenericQueue[T]) IsEmpty() bool {
q.mutex.RLock()
defer q.mutex.RUnlock()
return q.size == 0
}
// IsFull 检查队列是否已满
// 返回: 是否已满
func (q *GenericQueue[T]) IsFull() bool {
q.mutex.RLock()
defer q.mutex.RUnlock()
return q.size >= q.capacity
}
// Clear 清空队列
func (q *GenericQueue[T]) Clear() {
q.mutex.Lock()
defer q.mutex.Unlock()
q.size = 0
q.current = 0
q.lastUsed = make(map[int]time.Time)
}
// SetCooldown 设置元素使用冷却时间
// cooldown: 冷却时间
func (q *GenericQueue[T]) SetCooldown(cooldown time.Duration) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.cooldown = cooldown
}
// GetUsageInfo 获取元素使用信息
// 返回: 位置使用时间映射
func (q *GenericQueue[T]) GetUsageInfo() map[int]time.Time {
q.mutex.RLock()
defer q.mutex.RUnlock()
usage := make(map[int]time.Time)
for k, v := range q.lastUsed {
usage[k] = v
}
return usage
}
// BatchAdd 批量添加元素
// items: 要添加的元素切片
// 返回: 成功添加的数量
func (q *GenericQueue[T]) BatchAdd(items []T) int {
count := 0
for _, item := range items {
if q.Add(item) {
count++
}
}
return count
}
// Replace 替换所有元素
// items: 新的元素切片
// 返回: 是否替换成功
func (q *GenericQueue[T]) Replace(items []T) bool {
if len(items) > q.capacity {
return false
}
q.mutex.Lock()
defer q.mutex.Unlock()
q.size = 0
q.current = 0
q.lastUsed = make(map[int]time.Time)
for _, item := range items {
q.data[q.size] = item
q.size++
}
return true
}
// ReplaceItem 替换指定的单个元素
// oldItem: 要被替换的元素
// newItem: 新的元素
// 返回: 是否替换成功
func (q *GenericQueue[T]) ReplaceItem(oldItem, newItem T) bool {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.comparer == nil {
return false // 没有比较函数无法查找元素
}
for i := 0; i < q.size; i++ {
if q.comparer(q.data[i], oldItem) {
q.data[i] = newItem
return true
}
}
return false // 未找到要替换的元素
}
// Enqueue 入队操作(队列尾部添加元素)
// item: 要添加的元素
// 返回: 是否添加成功
func (q *GenericQueue[T]) Enqueue(item T) bool {
return q.Add(item)
}
// Dequeue 出队操作(从队列头部移除并返回元素)
// 返回: 元素和是否成功
func (q *GenericQueue[T]) Dequeue() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
var zero T
if q.size == 0 {
return zero, false // 队列为空
}
// 获取队列头部元素
item := q.data[0]
// 将后面的元素前移
for i := 0; i < q.size-1; i++ {
q.data[i] = q.data[i+1]
}
q.size--
// 调整current指针
if q.current > 0 {
q.current--
}
if q.current >= q.size && q.size > 0 {
q.current = 0
}
// 重新映射lastUsed因为索引发生了变化
newLastUsed := make(map[int]time.Time)
for index, lastTime := range q.lastUsed {
if index > 0 {
newLastUsed[index-1] = lastTime
}
}
q.lastUsed = newLastUsed
return item, true
}
// Peek 查看队列头部元素(不移除)
// 返回: 元素和是否成功
func (q *GenericQueue[T]) Peek() (T, bool) {
q.mutex.RLock()
defer q.mutex.RUnlock()
var zero T
if q.size == 0 {
return zero, false // 队列为空
}
return q.data[0], true
}
// PeekLast 查看队列尾部元素(不移除)
// 返回: 元素和是否成功
func (q *GenericQueue[T]) PeekLast() (T, bool) {
q.mutex.RLock()
defer q.mutex.RUnlock()
var zero T
if q.size == 0 {
return zero, false // 队列为空
}
return q.data[q.size-1], true
}
// ApiKeyInfo API密钥信息结构体
type ApiKeyInfo struct {
Key string `json:"key"` // API密钥
Name string `json:"name"` // 密钥名称
Weight int `json:"weight"` // 权重
Enabled bool `json:"enabled"` // 是否启用
Metadata map[string]string `json:"metadata"` // 元数据
}