Files
exchange_go/pkg/utility/cmap/iterator.go

161 lines
4.4 KiB
Go
Raw Permalink Normal View History

2025-02-06 11:14:33 +08:00
package cmap
import (
"sync"
"github.com/bytedance/sonic"
)
// 迭代器部分
// Tuple Used by the Iter & IterBuffered functions to wrap two variables together over a channel
// 由Iter & IterBuffered函数使用在一个通道上封装两个变量
type Tuple struct {
Key string
Val interface{}
}
// Iter returns an iterator which could be used in a for range loop.
// 返回一个可用于for范围循环的迭代器。
// Deprecated: using IterBuffered() will get a better performence
// 使用IterBuffered()将获得更好的性能
func (m ConcurrentMap) Iter() <-chan Tuple {
chans := snapshot(m)
ch := make(chan Tuple) // 不带缓冲
go fanIn(chans, ch)
return ch
}
// IterBuffered returns a buffered iterator which could be used in a for range loop.
// 返回一个可用于for范围循环的缓冲迭代器。
func (m ConcurrentMap) IterBuffered() <-chan Tuple {
chans := snapshot(m)
total := 0
for _, c := range chans {
total += cap(c)
}
ch := make(chan Tuple, total) // 一次性写完到缓冲中
go fanIn(chans, ch)
return ch
}
// Returns a array of channels that contains elements in each shard,
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
// 返回一个通道数组其中包含每个shard中的元素它可能会获取' m '的快照。
// 一旦确定了每个缓冲通道的大小在使用goroutines填充所有通道之前它将返回。
func snapshot(m ConcurrentMap) (chans []chan Tuple) {
chans = make([]chan Tuple, ShardCount)
wg := sync.WaitGroup{}
wg.Add(ShardCount)
// Foreach shard.
for index, shard := range m {
go func(index int, shard *ConcurrentMapShared) {
shard.RLock()
chans[index] = make(chan Tuple, len(shard.items))
wg.Done() // 只要创建了通道就不用再阻塞了
for key, val := range shard.items {
chans[index] <- Tuple{key, val}
}
shard.RUnlock()
close(chans[index])
}(index, shard)
}
wg.Wait()
return chans
}
// fanIn reads elements from channels `chans` into channel `out`
// 从通道' chans '读取元素到通道' out '
func fanIn(chans []chan Tuple, out chan Tuple) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
go func(ch chan Tuple) {
for t := range ch {
out <- t
}
wg.Done()
}(ch)
}
wg.Wait()
close(out)
}
// Items returns all items as map[string]interface{}
// 返回所有条目作为map[string]interface{}
func (m ConcurrentMap) Items() map[string]interface{} {
tmp := make(map[string]interface{})
// Insert items to temporary map. 向临时映射中插入项目。
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return tmp
}
// IterCb Iterator callback,called for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
// 迭代器回调函数在map中找到的每个键和值都会被调用。
// RLock对给定分片的所有调用都保持因此回调获得一个分片的一致视图但不跨分片
type IterCb func(key string, v interface{})
// IterCb Callback based iterator, cheapest way to read all elements in a map.
// 基于回调的迭代器,读取映射中所有元素的最便宜方法。
func (m ConcurrentMap) IterCb(fn IterCb) {
for idx := range m {
shard := (m)[idx]
shard.RLock()
for key, value := range shard.items {
fn(key, value)
}
shard.RUnlock()
}
}
// Keys returns all keys as []string
// 返回所有键为[]字符串
func (m ConcurrentMap) Keys() []string {
count := m.Count()
ch := make(chan string, count)
go func() {
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(ShardCount)
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
ch <- key
}
shard.RUnlock()
wg.Done()
}(shard)
}
wg.Wait()
close(ch)
}()
// Generate keys
keys := make([]string, 0, count)
for k := range ch {
keys = append(keys, k)
}
return keys
}
// MarshalJSON Reviles ConcurrentMap "private" variables to json marshal.
// 将存储的所有数据json序列化输出
func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
tmp := make(map[string]interface{})
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return sonic.Marshal(tmp)
}