Files
exchange_go/pkg/utility/cmap/concurrentmap.go

198 lines
5.7 KiB
Go
Raw Permalink Normal View History

2025-02-06 11:14:33 +08:00
package cmap
import (
"sync"
)
var SHARD_COUNT = 32
// 一个分片的map存储器 可并发
const ShardCount = 31 // 分区数量
// ConcurrentMap A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (ShardCount) map shards.
type ConcurrentMap []*ConcurrentMapShared // 分片存储map 可并发
// ConcurrentMapShared A "thread" safe string to anything map.
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // Read Write mutex, guards access to internal map.
}
// New Creates a new concurrent map.
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
}
// GetNoLock retrieves an element from map under given key.
func (m ConcurrentMap) GetNoLock(shard *ConcurrentMapShared, key string) (interface{}, bool) {
// Get item from shard.
val, ok := shard.items[key]
return val, ok
}
// SetNoLock retrieves an element from map under given key.
func (m ConcurrentMap) SetNoLock(shard *ConcurrentMapShared, key string, value interface{}) {
shard.items[key] = value
}
// NewConcurrentMap 创建
func NewConcurrentMap() ConcurrentMap {
m := make(ConcurrentMap, ShardCount)
for i := 0; i < ShardCount; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
}
// GetShard 返回给定键下的分片
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[fnv32(key)&ShardCount]
}
// MSet 存储一组map
func (m ConcurrentMap) MSet(data map[string]interface{}) {
for key, value := range data {
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
}
// Set the given value under the specified key.
// 在指定的键下设置给定的值。
func (m ConcurrentMap) Set(key string, value interface{}) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
// UpsertCb Callback to return new element to be inserted into the map
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
// 回调函数返回新元素插入到映射中。它在锁被持有时被调用,因此它一定不要试图访问同一映射中的其他键,因为它可能导致死锁。 RWLock是不可重入的
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
// Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb
// 插入或更新——使用UpsertCb更新现有元素或插入新元素
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
res = cb(ok, v, value)
shard.items[key] = res
shard.Unlock()
return res
}
// SetIfAbsent Sets the given value under the specified key if no value was associated with it.
// 如果没有值与指定键关联,则在指定键下设置给定值。
func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
_, ok := shard.items[key]
if !ok {
shard.items[key] = value
}
shard.Unlock()
return !ok
}
// Get retrieves an element from map under given key.
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
shard := m.GetShard(key)
shard.RLock()
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
// Count returns the number of elements within the map.
func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i < ShardCount; i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}
return count
}
// Has Looks up an item under specified key 存在性
func (m ConcurrentMap) Has(key string) bool {
shard := m.GetShard(key)
shard.RLock()
_, ok := shard.items[key]
shard.RUnlock()
return ok
}
// Remove removes an element from the map. 移除
func (m ConcurrentMap) Remove(key string) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
delete(shard.items, key)
shard.Unlock()
}
// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
// 是一个在map.RemoveCb()调用中执行的回调函数当Lock被持有时如果返回true该元素将从map中移除
type RemoveCb func(key string, v interface{}, exists bool) bool
// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
// 如果callback返回true且element存在则将其从map中移除。返回callback返回的值(即使element不存在于map中)
func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
remove := cb(key, v, ok)
if remove && ok {
delete(shard.items, key)
}
shard.Unlock()
return remove
}
// Pop removes an element from the map and returns it
// 从映射中移除一个元素并返回它
func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
v, exists = shard.items[key]
delete(shard.items, key)
shard.Unlock()
return v, exists
}
// IsEmpty checks if map is empty. 是否是空的
func (m ConcurrentMap) IsEmpty() bool {
return m.Count() == 0
}
// 将键值映射为数字uint32
func fnv32(key string) uint32 {
const prime32 = uint32(16777619)
hash := uint32(2166136261)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}