184 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			184 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								package helper
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import (
							 | 
						||
| 
								 | 
							
									"context"
							 | 
						||
| 
								 | 
							
									"errors"
							 | 
						||
| 
								 | 
							
									"go-admin/pkg/utility"
							 | 
						||
| 
								 | 
							
									"math/rand"
							 | 
						||
| 
								 | 
							
									"strconv"
							 | 
						||
| 
								 | 
							
									"sync"
							 | 
						||
| 
								 | 
							
									"sync/atomic"
							 | 
						||
| 
								 | 
							
									"time"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									log "github.com/go-admin-team/go-admin-core/logger"
							 | 
						||
| 
								 | 
							
									"github.com/go-redis/redis/v8"
							 | 
						||
| 
								 | 
							
									"go.uber.org/zap"
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const (
							 | 
						||
| 
								 | 
							
									tolerance       = 500 // milliseconds
							 | 
						||
| 
								 | 
							
									millisPerSecond = 1000
							 | 
						||
| 
								 | 
							
									lockCommand     = `if redis.call("GET", KEYS[1]) == ARGV[1] then
							 | 
						||
| 
								 | 
							
								    redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
							 | 
						||
| 
								 | 
							
								    return "OK"
							 | 
						||
| 
								 | 
							
								else
							 | 
						||
| 
								 | 
							
								    return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
							 | 
						||
| 
								 | 
							
								end`
							 | 
						||
| 
								 | 
							
									delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
							 | 
						||
| 
								 | 
							
								    return redis.call("DEL", KEYS[1])
							 | 
						||
| 
								 | 
							
								else
							 | 
						||
| 
								 | 
							
								    return 0
							 | 
						||
| 
								 | 
							
								end`
							 | 
						||
| 
								 | 
							
									touchCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
							 | 
						||
| 
								 | 
							
								    return redis.call("PEXPIRE", KEYS[1], ARGV[2])
							 | 
						||
| 
								 | 
							
								else
							 | 
						||
| 
								 | 
							
								    return 0
							 | 
						||
| 
								 | 
							
								end`
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								var (
							 | 
						||
| 
								 | 
							
									clientRedisLock *redis.Client
							 | 
						||
| 
								 | 
							
									onece           sync.Once
							 | 
						||
| 
								 | 
							
									ErrFailed       = errors.New("redsync: failed to acquire lock")
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// InitLockRedisConn 初始化 Redis 连接
							 | 
						||
| 
								 | 
							
								func InitLockRedisConn(host, password, dbIndex string) {
							 | 
						||
| 
								 | 
							
									onece.Do(func() {
							 | 
						||
| 
								 | 
							
										dbIndexInt, err := strconv.Atoi(dbIndex)
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											dbIndexInt = 0
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										clientRedisLock = redis.NewClient(&redis.Options{
							 | 
						||
| 
								 | 
							
											Addr:     host,
							 | 
						||
| 
								 | 
							
											Password: password,
							 | 
						||
| 
								 | 
							
											DB:       dbIndexInt,
							 | 
						||
| 
								 | 
							
										})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										// 测试连接
							 | 
						||
| 
								 | 
							
										if _, err := clientRedisLock.Ping(context.Background()).Result(); err != nil {
							 | 
						||
| 
								 | 
							
											log.Error("Failed to connect to Redis", zap.Error(err))
							 | 
						||
| 
								 | 
							
											panic(err)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									})
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// RedisLock 分布式锁结构
							 | 
						||
| 
								 | 
							
								type RedisLock struct {
							 | 
						||
| 
								 | 
							
									redisClient   *redis.Client
							 | 
						||
| 
								 | 
							
									key           string
							 | 
						||
| 
								 | 
							
									id            string
							 | 
						||
| 
								 | 
							
									expiry        time.Duration
							 | 
						||
| 
								 | 
							
									retryCount    int           // 重试次数
							 | 
						||
| 
								 | 
							
									retryInterval time.Duration // 重试间隔时间
							 | 
						||
| 
								 | 
							
									seconds       uint32
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// NewRedisLock 创建一个新的 Redis 锁
							 | 
						||
| 
								 | 
							
								func NewRedisLock(key string, timeout uint32, retryCount int, retryInterval time.Duration) *RedisLock {
							 | 
						||
| 
								 | 
							
									return &RedisLock{
							 | 
						||
| 
								 | 
							
										redisClient:   clientRedisLock,
							 | 
						||
| 
								 | 
							
										key:           key,
							 | 
						||
| 
								 | 
							
										id:            utility.GetXid(),
							 | 
						||
| 
								 | 
							
										expiry:        time.Duration(timeout) * time.Second,
							 | 
						||
| 
								 | 
							
										retryCount:    retryCount,
							 | 
						||
| 
								 | 
							
										retryInterval: retryInterval,
							 | 
						||
| 
								 | 
							
										seconds:       timeout,
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Acquire 获取锁
							 | 
						||
| 
								 | 
							
								func (rl *RedisLock) Acquire() (bool, error) {
							 | 
						||
| 
								 | 
							
									return rl.acquireCtx(context.Background())
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// AcquireWait 获取锁,支持重试和超时
							 | 
						||
| 
								 | 
							
								func (rl *RedisLock) AcquireWait(ctx context.Context) (bool, error) {
							 | 
						||
| 
								 | 
							
									if ctx == nil {
							 | 
						||
| 
								 | 
							
										ctx = context.Background()
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									for i := 0; i < rl.retryCount; i++ {
							 | 
						||
| 
								 | 
							
										if i > 0 {
							 | 
						||
| 
								 | 
							
											// 指数退避
							 | 
						||
| 
								 | 
							
											baseInterval := time.Duration(int64(rl.retryInterval) * (1 << i)) // 指数增长
							 | 
						||
| 
								 | 
							
											if baseInterval > time.Second {
							 | 
						||
| 
								 | 
							
												baseInterval = time.Second
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
											// 随机退避
							 | 
						||
| 
								 | 
							
											retryInterval := time.Duration(rand.Int63n(int64(baseInterval))) // 随机退避
							 | 
						||
| 
								 | 
							
											if retryInterval < time.Millisecond*100 {
							 | 
						||
| 
								 | 
							
												retryInterval = time.Millisecond * 100 // 至少 100ms
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
											select {
							 | 
						||
| 
								 | 
							
											case <-ctx.Done():
							 | 
						||
| 
								 | 
							
												return false, ctx.Err()
							 | 
						||
| 
								 | 
							
											case <-time.After(retryInterval):
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										ok, err := rl.acquireCtx(ctx)
							 | 
						||
| 
								 | 
							
										if ok {
							 | 
						||
| 
								 | 
							
											return true, nil
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										if err != nil {
							 | 
						||
| 
								 | 
							
											log.Error("Failed to acquire lock", zap.String("key", rl.key), zap.Error(err))
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return false, ErrFailed
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Release 释放锁
							 | 
						||
| 
								 | 
							
								func (rl *RedisLock) Release() (bool, error) {
							 | 
						||
| 
								 | 
							
									return rl.releaseCtx(context.Background())
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Touch 续期锁
							 | 
						||
| 
								 | 
							
								func (rl *RedisLock) Touch() (bool, error) {
							 | 
						||
| 
								 | 
							
									return rl.touchCtx(context.Background())
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// acquireCtx 获取锁的核心逻辑
							 | 
						||
| 
								 | 
							
								func (rl *RedisLock) acquireCtx(ctx context.Context) (bool, error) {
							 | 
						||
| 
								 | 
							
									resp, err := rl.redisClient.Eval(ctx, lockCommand, []string{rl.key}, []string{
							 | 
						||
| 
								 | 
							
										rl.id, strconv.Itoa(int(rl.seconds)*millisPerSecond + tolerance),
							 | 
						||
| 
								 | 
							
									}).Result()
							 | 
						||
| 
								 | 
							
									if err == redis.Nil {
							 | 
						||
| 
								 | 
							
										return false, nil
							 | 
						||
| 
								 | 
							
									} else if err != nil {
							 | 
						||
| 
								 | 
							
										log.Error("Error acquiring lock", zap.String("key", rl.key), zap.Error(err))
							 | 
						||
| 
								 | 
							
										return false, err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									reply, ok := resp.(string)
							 | 
						||
| 
								 | 
							
									return ok && reply == "OK", nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// releaseCtx 释放锁的核心逻辑
							 | 
						||
| 
								 | 
							
								func (rl *RedisLock) releaseCtx(ctx context.Context) (bool, error) {
							 | 
						||
| 
								 | 
							
									resp, err := rl.redisClient.Eval(ctx, delCommand, []string{rl.key}, []string{rl.id}).Result()
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										log.Error("Error releasing lock", zap.String("key", rl.key), zap.Error(err))
							 | 
						||
| 
								 | 
							
										return false, err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									reply, ok := resp.(int64)
							 | 
						||
| 
								 | 
							
									return ok && reply == 1, nil
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// touchCtx 续期锁的核心逻辑
							 | 
						||
| 
								 | 
							
								func (rl *RedisLock) touchCtx(ctx context.Context) (bool, error) {
							 | 
						||
| 
								 | 
							
									seconds := atomic.LoadUint32(&rl.seconds)
							 | 
						||
| 
								 | 
							
									resp, err := rl.redisClient.Eval(ctx, touchCommand, []string{rl.key}, []string{
							 | 
						||
| 
								 | 
							
										rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
							 | 
						||
| 
								 | 
							
									}).Result()
							 | 
						||
| 
								 | 
							
									if err != nil {
							 | 
						||
| 
								 | 
							
										log.Error("Error touching lock", zap.String("key", rl.key), zap.Error(err))
							 | 
						||
| 
								 | 
							
										return false, err
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									reply, ok := resp.(int64)
							 | 
						||
| 
								 | 
							
									return ok && reply == 1, nil
							 | 
						||
| 
								 | 
							
								}
							 |