Files
exchange_go/common/helper/redislock.go

197 lines
4.9 KiB
Go

package helper
import (
"context"
"errors"
"fmt"
"go-admin/pkg/utility"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
log "github.com/go-admin-team/go-admin-core/logger"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
)
const (
tolerance = 500 // milliseconds
millisPerSecond = 1000
lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
return "OK"
else
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end`
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end`
touchCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end`
)
var (
clientRedisLock *redis.Client
onece sync.Once
ErrFailed = errors.New("redsync: failed to acquire lock")
)
// InitLockRedisConn 初始化 Redis 连接
func InitLockRedisConn(host, password, dbIndex string) {
onece.Do(func() {
dbIndexInt, err := strconv.Atoi(dbIndex)
if err != nil {
dbIndexInt = 0
}
clientRedisLock = redis.NewClient(&redis.Options{
Addr: host,
Password: password,
DB: dbIndexInt,
})
// 测试连接
if _, err := clientRedisLock.Ping(context.Background()).Result(); err != nil {
log.Error("Failed to connect to Redis", zap.Error(err))
panic(err)
}
fmt.Println("redis lock初始化完毕")
})
}
// RedisLock 分布式锁结构
type RedisLock struct {
redisClient *redis.Client
key string
id string
expiry time.Duration
retryCount int // 重试次数
retryInterval time.Duration // 重试间隔时间
seconds uint32
}
// NewRedisLock 创建一个新的 Redis 锁
func NewRedisLock(key string, timeout uint32, retryCount int, retryInterval time.Duration) *RedisLock {
return &RedisLock{
redisClient: clientRedisLock,
key: key,
id: utility.GetXid(),
expiry: time.Duration(timeout) * time.Second,
retryCount: retryCount,
retryInterval: retryInterval,
seconds: timeout,
}
}
// Acquire 获取锁
func (rl *RedisLock) Acquire() (bool, error) {
return rl.acquireCtx(context.Background())
}
// AcquireWait 获取锁,支持重试和超时
func (rl *RedisLock) AcquireWait(ctx context.Context) (bool, error) {
if ctx == nil {
ctx = context.Background()
}
for i := 0; i < rl.retryCount; i++ {
if i > 0 {
// 指数退避
baseInterval := time.Duration(int64(rl.retryInterval) * (1 << i)) // 指数增长
if baseInterval > time.Second {
baseInterval = time.Second
}
if baseInterval <= 0 {
baseInterval = time.Millisecond * 100 // 至少 100ms
}
// 随机退避
retryInterval := time.Duration(rand.Int63n(int64(baseInterval))) // 随机退避
if retryInterval < time.Millisecond*100 {
retryInterval = time.Millisecond * 100 // 至少 100ms
}
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(retryInterval):
}
}
ok, err := rl.acquireCtx(ctx)
if ok {
return true, nil
}
if err != nil {
log.Error("Failed to acquire lock", zap.String("key", rl.key), zap.Error(err))
}
}
return false, ErrFailed
}
func safeRandomDuration(max time.Duration) time.Duration {
if max <= 0 {
return 100 * time.Millisecond // fallback default
}
return time.Duration(rand.Int63n(int64(max)))
}
// Release 释放锁
func (rl *RedisLock) Release() (bool, error) {
return rl.releaseCtx(context.Background())
}
// Touch 续期锁
func (rl *RedisLock) Touch() (bool, error) {
return rl.touchCtx(context.Background())
}
// acquireCtx 获取锁的核心逻辑
func (rl *RedisLock) acquireCtx(ctx context.Context) (bool, error) {
resp, err := rl.redisClient.Eval(ctx, lockCommand, []string{rl.key}, []string{
rl.id, strconv.Itoa(int(rl.seconds)*millisPerSecond + tolerance),
}).Result()
if err == redis.Nil {
return false, nil
} else if err != nil {
log.Error("Error acquiring lock", zap.String("key", rl.key), zap.Error(err))
return false, err
}
reply, ok := resp.(string)
return ok && reply == "OK", nil
}
// releaseCtx 释放锁的核心逻辑
func (rl *RedisLock) releaseCtx(ctx context.Context) (bool, error) {
resp, err := rl.redisClient.Eval(ctx, delCommand, []string{rl.key}, []string{rl.id}).Result()
if err != nil {
log.Error("Error releasing lock", zap.String("key", rl.key), zap.Error(err))
return false, err
}
reply, ok := resp.(int64)
return ok && reply == 1, nil
}
// touchCtx 续期锁的核心逻辑
func (rl *RedisLock) touchCtx(ctx context.Context) (bool, error) {
seconds := atomic.LoadUint32(&rl.seconds)
resp, err := rl.redisClient.Eval(ctx, touchCommand, []string{rl.key}, []string{
rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
}).Result()
if err != nil {
log.Error("Error touching lock", zap.String("key", rl.key), zap.Error(err))
return false, err
}
reply, ok := resp.(int64)
return ok && reply == 1, nil
}