1
Some checks failed
Build / build (push) Has been cancelled
CodeQL / Analyze (go) (push) Has been cancelled
build / Build (push) Has been cancelled
GitHub Actions Mirror / mirror_to_gitee (push) Has been cancelled
GitHub Actions Mirror / mirror_to_gitlab (push) Has been cancelled
Issue Close Require / issue-close-require (push) Has been cancelled
Issue Check Inactive / issue-check-inactive (push) Has been cancelled
Some checks failed
Build / build (push) Has been cancelled
CodeQL / Analyze (go) (push) Has been cancelled
build / Build (push) Has been cancelled
GitHub Actions Mirror / mirror_to_gitee (push) Has been cancelled
GitHub Actions Mirror / mirror_to_gitlab (push) Has been cancelled
Issue Close Require / issue-close-require (push) Has been cancelled
Issue Check Inactive / issue-check-inactive (push) Has been cancelled
This commit is contained in:
195
utils/redishelper/redis_lock.go
Normal file
195
utils/redishelper/redis_lock.go
Normal file
@ -0,0 +1,195 @@
|
||||
package redishelper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go-admin/utils/utility"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "github.com/go-admin-team/go-admin-core/logger"
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
const (
|
||||
tolerance = 500 // milliseconds
|
||||
millisPerSecond = 1000
|
||||
lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
|
||||
return "OK"
|
||||
else
|
||||
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
|
||||
end`
|
||||
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("DEL", KEYS[1])
|
||||
else
|
||||
return 0
|
||||
end`
|
||||
touchCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
|
||||
else
|
||||
return 0
|
||||
end`
|
||||
)
|
||||
|
||||
var (
|
||||
clientRedisLock *redis.Client
|
||||
onece sync.Once
|
||||
ErrFailed = errors.New("redsync: failed to acquire lock")
|
||||
)
|
||||
|
||||
// InitLockRedisConn 初始化 Redis 连接
|
||||
func InitLockRedisConn(host, password, dbIndex string) {
|
||||
onece.Do(func() {
|
||||
dbIndexInt, err := strconv.Atoi(dbIndex)
|
||||
if err != nil {
|
||||
dbIndexInt = 0
|
||||
}
|
||||
clientRedisLock = redis.NewClient(&redis.Options{
|
||||
Addr: host,
|
||||
Password: password,
|
||||
DB: dbIndexInt,
|
||||
})
|
||||
|
||||
// 测试连接
|
||||
if _, err := clientRedisLock.Ping(context.Background()).Result(); err != nil {
|
||||
log.Errorf("Failed to connect to Redis :%v", err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Println("redis lock初始化完毕")
|
||||
})
|
||||
}
|
||||
|
||||
// RedisLock 分布式锁结构
|
||||
type RedisLock struct {
|
||||
redisClient *redis.Client
|
||||
key string
|
||||
id string
|
||||
expiry time.Duration
|
||||
retryCount int // 重试次数
|
||||
retryInterval time.Duration // 重试间隔时间
|
||||
seconds uint32
|
||||
}
|
||||
|
||||
// NewRedisLock 创建一个新的 Redis 锁
|
||||
func NewRedisLock(key string, timeout uint32, retryCount int, retryInterval time.Duration) *RedisLock {
|
||||
return &RedisLock{
|
||||
redisClient: clientRedisLock,
|
||||
key: key,
|
||||
id: utility.GetXid(),
|
||||
expiry: time.Duration(timeout) * time.Second,
|
||||
retryCount: retryCount,
|
||||
retryInterval: retryInterval,
|
||||
seconds: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire 获取锁
|
||||
func (rl *RedisLock) Acquire() (bool, error) {
|
||||
return rl.acquireCtx(context.Background())
|
||||
}
|
||||
|
||||
// AcquireWait 获取锁,支持重试和超时
|
||||
func (rl *RedisLock) AcquireWait(ctx context.Context) (bool, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
for i := 0; i < rl.retryCount; i++ {
|
||||
if i > 0 {
|
||||
// 指数退避
|
||||
baseInterval := time.Duration(int64(rl.retryInterval) * (1 << i)) // 指数增长
|
||||
if baseInterval > time.Second {
|
||||
baseInterval = time.Second
|
||||
}
|
||||
|
||||
if baseInterval <= 0 {
|
||||
baseInterval = time.Millisecond * 100 // 至少 100ms
|
||||
}
|
||||
// 随机退避
|
||||
retryInterval := time.Duration(rand.Int63n(int64(baseInterval))) // 随机退避
|
||||
if retryInterval < time.Millisecond*100 {
|
||||
retryInterval = time.Millisecond * 100 // 至少 100ms
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
case <-time.After(retryInterval):
|
||||
}
|
||||
}
|
||||
|
||||
ok, err := rl.acquireCtx(ctx)
|
||||
if ok {
|
||||
return true, nil
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("Failed to acquire lock %s:%v", rl.key, err)
|
||||
}
|
||||
}
|
||||
return false, ErrFailed
|
||||
}
|
||||
|
||||
func safeRandomDuration(max time.Duration) time.Duration {
|
||||
if max <= 0 {
|
||||
return 100 * time.Millisecond // fallback default
|
||||
}
|
||||
return time.Duration(rand.Int63n(int64(max)))
|
||||
}
|
||||
|
||||
// Release 释放锁
|
||||
func (rl *RedisLock) Release() (bool, error) {
|
||||
return rl.releaseCtx(context.Background())
|
||||
}
|
||||
|
||||
// Touch 续期锁
|
||||
func (rl *RedisLock) Touch() (bool, error) {
|
||||
return rl.touchCtx(context.Background())
|
||||
}
|
||||
|
||||
// acquireCtx 获取锁的核心逻辑
|
||||
func (rl *RedisLock) acquireCtx(ctx context.Context) (bool, error) {
|
||||
resp, err := rl.redisClient.Eval(ctx, lockCommand, []string{rl.key}, []string{
|
||||
rl.id, strconv.Itoa(int(rl.seconds)*millisPerSecond + tolerance),
|
||||
}).Result()
|
||||
if err == redis.Nil {
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
log.Error("Error acquiring lock key:%s %v", rl.key, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
reply, ok := resp.(string)
|
||||
return ok && reply == "OK", nil
|
||||
}
|
||||
|
||||
// releaseCtx 释放锁的核心逻辑
|
||||
func (rl *RedisLock) releaseCtx(ctx context.Context) (bool, error) {
|
||||
resp, err := rl.redisClient.Eval(ctx, delCommand, []string{rl.key}, []string{rl.id}).Result()
|
||||
if err != nil {
|
||||
log.Error("Error releasing lock key:%s %v", rl.key, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
reply, ok := resp.(int64)
|
||||
return ok && reply == 1, nil
|
||||
}
|
||||
|
||||
// touchCtx 续期锁的核心逻辑
|
||||
func (rl *RedisLock) touchCtx(ctx context.Context) (bool, error) {
|
||||
seconds := atomic.LoadUint32(&rl.seconds)
|
||||
resp, err := rl.redisClient.Eval(ctx, touchCommand, []string{rl.key}, []string{
|
||||
rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
|
||||
}).Result()
|
||||
if err != nil {
|
||||
log.Error("Error touching lock key:%s %v", rl.key, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
reply, ok := resp.(int64)
|
||||
return ok && reply == 1, nil
|
||||
}
|
||||
Reference in New Issue
Block a user