196 lines
5.0 KiB
Go
196 lines
5.0 KiB
Go
package redishelper
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"go-admin/utils/utility"
|
|
"math/rand"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
log "github.com/go-admin-team/go-admin-core/logger"
|
|
"github.com/go-redis/redis/v8"
|
|
)
|
|
|
|
const (
|
|
tolerance = 500 // milliseconds
|
|
millisPerSecond = 1000
|
|
lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
|
|
return "OK"
|
|
else
|
|
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
|
|
end`
|
|
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
return redis.call("DEL", KEYS[1])
|
|
else
|
|
return 0
|
|
end`
|
|
touchCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
|
|
else
|
|
return 0
|
|
end`
|
|
)
|
|
|
|
var (
|
|
clientRedisLock *redis.Client
|
|
onece sync.Once
|
|
ErrFailed = errors.New("redsync: failed to acquire lock")
|
|
)
|
|
|
|
// InitLockRedisConn 初始化 Redis 连接
|
|
func InitLockRedisConn(host, password, dbIndex string) {
|
|
onece.Do(func() {
|
|
dbIndexInt, err := strconv.Atoi(dbIndex)
|
|
if err != nil {
|
|
dbIndexInt = 0
|
|
}
|
|
clientRedisLock = redis.NewClient(&redis.Options{
|
|
Addr: host,
|
|
Password: password,
|
|
DB: dbIndexInt,
|
|
})
|
|
|
|
// 测试连接
|
|
if _, err := clientRedisLock.Ping(context.Background()).Result(); err != nil {
|
|
log.Errorf("Failed to connect to Redis :%v", err)
|
|
panic(err)
|
|
}
|
|
|
|
fmt.Println("redis lock初始化完毕")
|
|
})
|
|
}
|
|
|
|
// RedisLock 分布式锁结构
|
|
type RedisLock struct {
|
|
redisClient *redis.Client
|
|
key string
|
|
id string
|
|
expiry time.Duration
|
|
retryCount int // 重试次数
|
|
retryInterval time.Duration // 重试间隔时间
|
|
seconds uint32
|
|
}
|
|
|
|
// NewRedisLock 创建一个新的 Redis 锁
|
|
func NewRedisLock(key string, timeout uint32, retryCount int, retryInterval time.Duration) *RedisLock {
|
|
return &RedisLock{
|
|
redisClient: clientRedisLock,
|
|
key: key,
|
|
id: utility.GetXid(),
|
|
expiry: time.Duration(timeout) * time.Second,
|
|
retryCount: retryCount,
|
|
retryInterval: retryInterval,
|
|
seconds: timeout,
|
|
}
|
|
}
|
|
|
|
// Acquire 获取锁
|
|
func (rl *RedisLock) Acquire() (bool, error) {
|
|
return rl.acquireCtx(context.Background())
|
|
}
|
|
|
|
// AcquireWait 获取锁,支持重试和超时
|
|
func (rl *RedisLock) AcquireWait(ctx context.Context) (bool, error) {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
for i := 0; i < rl.retryCount; i++ {
|
|
if i > 0 {
|
|
// 指数退避
|
|
baseInterval := time.Duration(int64(rl.retryInterval) * (1 << i)) // 指数增长
|
|
if baseInterval > time.Second {
|
|
baseInterval = time.Second
|
|
}
|
|
|
|
if baseInterval <= 0 {
|
|
baseInterval = time.Millisecond * 100 // 至少 100ms
|
|
}
|
|
// 随机退避
|
|
retryInterval := time.Duration(rand.Int63n(int64(baseInterval))) // 随机退避
|
|
if retryInterval < time.Millisecond*100 {
|
|
retryInterval = time.Millisecond * 100 // 至少 100ms
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
case <-time.After(retryInterval):
|
|
}
|
|
}
|
|
|
|
ok, err := rl.acquireCtx(ctx)
|
|
if ok {
|
|
return true, nil
|
|
}
|
|
if err != nil {
|
|
log.Errorf("Failed to acquire lock %s:%v", rl.key, err)
|
|
}
|
|
}
|
|
return false, ErrFailed
|
|
}
|
|
|
|
func safeRandomDuration(max time.Duration) time.Duration {
|
|
if max <= 0 {
|
|
return 100 * time.Millisecond // fallback default
|
|
}
|
|
return time.Duration(rand.Int63n(int64(max)))
|
|
}
|
|
|
|
// Release 释放锁
|
|
func (rl *RedisLock) Release() (bool, error) {
|
|
return rl.releaseCtx(context.Background())
|
|
}
|
|
|
|
// Touch 续期锁
|
|
func (rl *RedisLock) Touch() (bool, error) {
|
|
return rl.touchCtx(context.Background())
|
|
}
|
|
|
|
// acquireCtx 获取锁的核心逻辑
|
|
func (rl *RedisLock) acquireCtx(ctx context.Context) (bool, error) {
|
|
resp, err := rl.redisClient.Eval(ctx, lockCommand, []string{rl.key}, []string{
|
|
rl.id, strconv.Itoa(int(rl.seconds)*millisPerSecond + tolerance),
|
|
}).Result()
|
|
if err == redis.Nil {
|
|
return false, nil
|
|
} else if err != nil {
|
|
log.Error("Error acquiring lock key:%s %v", rl.key, err)
|
|
return false, err
|
|
}
|
|
|
|
reply, ok := resp.(string)
|
|
return ok && reply == "OK", nil
|
|
}
|
|
|
|
// releaseCtx 释放锁的核心逻辑
|
|
func (rl *RedisLock) releaseCtx(ctx context.Context) (bool, error) {
|
|
resp, err := rl.redisClient.Eval(ctx, delCommand, []string{rl.key}, []string{rl.id}).Result()
|
|
if err != nil {
|
|
log.Error("Error releasing lock key:%s %v", rl.key, err)
|
|
return false, err
|
|
}
|
|
|
|
reply, ok := resp.(int64)
|
|
return ok && reply == 1, nil
|
|
}
|
|
|
|
// touchCtx 续期锁的核心逻辑
|
|
func (rl *RedisLock) touchCtx(ctx context.Context) (bool, error) {
|
|
seconds := atomic.LoadUint32(&rl.seconds)
|
|
resp, err := rl.redisClient.Eval(ctx, touchCommand, []string{rl.key}, []string{
|
|
rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
|
|
}).Result()
|
|
if err != nil {
|
|
log.Error("Error touching lock key:%s %v", rl.key, err)
|
|
return false, err
|
|
}
|
|
|
|
reply, ok := resp.(int64)
|
|
return ok && reply == 1, nil
|
|
}
|