184 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			184 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package helper
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"go-admin/pkg/utility"
 | 
						|
	"math/rand"
 | 
						|
	"strconv"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	log "github.com/go-admin-team/go-admin-core/logger"
 | 
						|
	"github.com/go-redis/redis/v8"
 | 
						|
	"go.uber.org/zap"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	tolerance       = 500 // milliseconds
 | 
						|
	millisPerSecond = 1000
 | 
						|
	lockCommand     = `if redis.call("GET", KEYS[1]) == ARGV[1] then
 | 
						|
    redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
 | 
						|
    return "OK"
 | 
						|
else
 | 
						|
    return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
 | 
						|
end`
 | 
						|
	delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
 | 
						|
    return redis.call("DEL", KEYS[1])
 | 
						|
else
 | 
						|
    return 0
 | 
						|
end`
 | 
						|
	touchCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
 | 
						|
    return redis.call("PEXPIRE", KEYS[1], ARGV[2])
 | 
						|
else
 | 
						|
    return 0
 | 
						|
end`
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	clientRedisLock *redis.Client
 | 
						|
	onece           sync.Once
 | 
						|
	ErrFailed       = errors.New("redsync: failed to acquire lock")
 | 
						|
)
 | 
						|
 | 
						|
// InitLockRedisConn 初始化 Redis 连接
 | 
						|
func InitLockRedisConn(host, password, dbIndex string) {
 | 
						|
	onece.Do(func() {
 | 
						|
		dbIndexInt, err := strconv.Atoi(dbIndex)
 | 
						|
		if err != nil {
 | 
						|
			dbIndexInt = 0
 | 
						|
		}
 | 
						|
		clientRedisLock = redis.NewClient(&redis.Options{
 | 
						|
			Addr:     host,
 | 
						|
			Password: password,
 | 
						|
			DB:       dbIndexInt,
 | 
						|
		})
 | 
						|
 | 
						|
		// 测试连接
 | 
						|
		if _, err := clientRedisLock.Ping(context.Background()).Result(); err != nil {
 | 
						|
			log.Error("Failed to connect to Redis", zap.Error(err))
 | 
						|
			panic(err)
 | 
						|
		}
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// RedisLock 分布式锁结构
 | 
						|
type RedisLock struct {
 | 
						|
	redisClient   *redis.Client
 | 
						|
	key           string
 | 
						|
	id            string
 | 
						|
	expiry        time.Duration
 | 
						|
	retryCount    int           // 重试次数
 | 
						|
	retryInterval time.Duration // 重试间隔时间
 | 
						|
	seconds       uint32
 | 
						|
}
 | 
						|
 | 
						|
// NewRedisLock 创建一个新的 Redis 锁
 | 
						|
func NewRedisLock(key string, timeout uint32, retryCount int, retryInterval time.Duration) *RedisLock {
 | 
						|
	return &RedisLock{
 | 
						|
		redisClient:   clientRedisLock,
 | 
						|
		key:           key,
 | 
						|
		id:            utility.GetXid(),
 | 
						|
		expiry:        time.Duration(timeout) * time.Second,
 | 
						|
		retryCount:    retryCount,
 | 
						|
		retryInterval: retryInterval,
 | 
						|
		seconds:       timeout,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Acquire 获取锁
 | 
						|
func (rl *RedisLock) Acquire() (bool, error) {
 | 
						|
	return rl.acquireCtx(context.Background())
 | 
						|
}
 | 
						|
 | 
						|
// AcquireWait 获取锁,支持重试和超时
 | 
						|
func (rl *RedisLock) AcquireWait(ctx context.Context) (bool, error) {
 | 
						|
	if ctx == nil {
 | 
						|
		ctx = context.Background()
 | 
						|
	}
 | 
						|
 | 
						|
	for i := 0; i < rl.retryCount; i++ {
 | 
						|
		if i > 0 {
 | 
						|
			// 指数退避
 | 
						|
			baseInterval := time.Duration(int64(rl.retryInterval) * (1 << i)) // 指数增长
 | 
						|
			if baseInterval > time.Second {
 | 
						|
				baseInterval = time.Second
 | 
						|
			}
 | 
						|
 | 
						|
			// 随机退避
 | 
						|
			retryInterval := time.Duration(rand.Int63n(int64(baseInterval))) // 随机退避
 | 
						|
			if retryInterval < time.Millisecond*100 {
 | 
						|
				retryInterval = time.Millisecond * 100 // 至少 100ms
 | 
						|
			}
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
				return false, ctx.Err()
 | 
						|
			case <-time.After(retryInterval):
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		ok, err := rl.acquireCtx(ctx)
 | 
						|
		if ok {
 | 
						|
			return true, nil
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			log.Error("Failed to acquire lock", zap.String("key", rl.key), zap.Error(err))
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return false, ErrFailed
 | 
						|
}
 | 
						|
 | 
						|
// Release 释放锁
 | 
						|
func (rl *RedisLock) Release() (bool, error) {
 | 
						|
	return rl.releaseCtx(context.Background())
 | 
						|
}
 | 
						|
 | 
						|
// Touch 续期锁
 | 
						|
func (rl *RedisLock) Touch() (bool, error) {
 | 
						|
	return rl.touchCtx(context.Background())
 | 
						|
}
 | 
						|
 | 
						|
// acquireCtx 获取锁的核心逻辑
 | 
						|
func (rl *RedisLock) acquireCtx(ctx context.Context) (bool, error) {
 | 
						|
	resp, err := rl.redisClient.Eval(ctx, lockCommand, []string{rl.key}, []string{
 | 
						|
		rl.id, strconv.Itoa(int(rl.seconds)*millisPerSecond + tolerance),
 | 
						|
	}).Result()
 | 
						|
	if err == redis.Nil {
 | 
						|
		return false, nil
 | 
						|
	} else if err != nil {
 | 
						|
		log.Error("Error acquiring lock", zap.String("key", rl.key), zap.Error(err))
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
 | 
						|
	reply, ok := resp.(string)
 | 
						|
	return ok && reply == "OK", nil
 | 
						|
}
 | 
						|
 | 
						|
// releaseCtx 释放锁的核心逻辑
 | 
						|
func (rl *RedisLock) releaseCtx(ctx context.Context) (bool, error) {
 | 
						|
	resp, err := rl.redisClient.Eval(ctx, delCommand, []string{rl.key}, []string{rl.id}).Result()
 | 
						|
	if err != nil {
 | 
						|
		log.Error("Error releasing lock", zap.String("key", rl.key), zap.Error(err))
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
 | 
						|
	reply, ok := resp.(int64)
 | 
						|
	return ok && reply == 1, nil
 | 
						|
}
 | 
						|
 | 
						|
// touchCtx 续期锁的核心逻辑
 | 
						|
func (rl *RedisLock) touchCtx(ctx context.Context) (bool, error) {
 | 
						|
	seconds := atomic.LoadUint32(&rl.seconds)
 | 
						|
	resp, err := rl.redisClient.Eval(ctx, touchCommand, []string{rl.key}, []string{
 | 
						|
		rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
 | 
						|
	}).Result()
 | 
						|
	if err != nil {
 | 
						|
		log.Error("Error touching lock", zap.String("key", rl.key), zap.Error(err))
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
 | 
						|
	reply, ok := resp.(int64)
 | 
						|
	return ok && reply == 1, nil
 | 
						|
}
 |