161 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			161 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								package cmap
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								import (
							 | 
						|||
| 
								 | 
							
									"sync"
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									"github.com/bytedance/sonic"
							 | 
						|||
| 
								 | 
							
								)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// 迭代器部分
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// Tuple Used by the Iter & IterBuffered functions to wrap two variables together over a channel
							 | 
						|||
| 
								 | 
							
								// 由Iter & IterBuffered函数使用,在一个通道上封装两个变量,
							 | 
						|||
| 
								 | 
							
								type Tuple struct {
							 | 
						|||
| 
								 | 
							
									Key string
							 | 
						|||
| 
								 | 
							
									Val interface{}
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// Iter returns an iterator which could be used in a for range loop.
							 | 
						|||
| 
								 | 
							
								// 返回一个可用于for范围循环的迭代器。
							 | 
						|||
| 
								 | 
							
								// Deprecated: using IterBuffered() will get a better performence
							 | 
						|||
| 
								 | 
							
								// 使用IterBuffered()将获得更好的性能
							 | 
						|||
| 
								 | 
							
								func (m ConcurrentMap) Iter() <-chan Tuple {
							 | 
						|||
| 
								 | 
							
									chans := snapshot(m)
							 | 
						|||
| 
								 | 
							
									ch := make(chan Tuple) // 不带缓冲
							 | 
						|||
| 
								 | 
							
									go fanIn(chans, ch)
							 | 
						|||
| 
								 | 
							
									return ch
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// IterBuffered returns a buffered iterator which could be used in a for range loop.
							 | 
						|||
| 
								 | 
							
								// 返回一个可用于for范围循环的缓冲迭代器。
							 | 
						|||
| 
								 | 
							
								func (m ConcurrentMap) IterBuffered() <-chan Tuple {
							 | 
						|||
| 
								 | 
							
									chans := snapshot(m)
							 | 
						|||
| 
								 | 
							
									total := 0
							 | 
						|||
| 
								 | 
							
									for _, c := range chans {
							 | 
						|||
| 
								 | 
							
										total += cap(c)
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
									ch := make(chan Tuple, total) // 一次性写完到缓冲中
							 | 
						|||
| 
								 | 
							
									go fanIn(chans, ch)
							 | 
						|||
| 
								 | 
							
									return ch
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// Returns a array of channels that contains elements in each shard,
							 | 
						|||
| 
								 | 
							
								// which likely takes a snapshot of `m`.
							 | 
						|||
| 
								 | 
							
								// It returns once the size of each buffered channel is determined,
							 | 
						|||
| 
								 | 
							
								// before all the channels are populated using goroutines.
							 | 
						|||
| 
								 | 
							
								// 返回一个通道数组,其中包含每个shard中的元素,它可能会获取' m '的快照。
							 | 
						|||
| 
								 | 
							
								// 一旦确定了每个缓冲通道的大小,在使用goroutines填充所有通道之前,它将返回。
							 | 
						|||
| 
								 | 
							
								func snapshot(m ConcurrentMap) (chans []chan Tuple) {
							 | 
						|||
| 
								 | 
							
									chans = make([]chan Tuple, ShardCount)
							 | 
						|||
| 
								 | 
							
									wg := sync.WaitGroup{}
							 | 
						|||
| 
								 | 
							
									wg.Add(ShardCount)
							 | 
						|||
| 
								 | 
							
									// Foreach shard.
							 | 
						|||
| 
								 | 
							
									for index, shard := range m {
							 | 
						|||
| 
								 | 
							
										go func(index int, shard *ConcurrentMapShared) {
							 | 
						|||
| 
								 | 
							
											shard.RLock()
							 | 
						|||
| 
								 | 
							
											chans[index] = make(chan Tuple, len(shard.items))
							 | 
						|||
| 
								 | 
							
											wg.Done() // 只要创建了通道就不用再阻塞了
							 | 
						|||
| 
								 | 
							
											for key, val := range shard.items {
							 | 
						|||
| 
								 | 
							
												chans[index] <- Tuple{key, val}
							 | 
						|||
| 
								 | 
							
											}
							 | 
						|||
| 
								 | 
							
											shard.RUnlock()
							 | 
						|||
| 
								 | 
							
											close(chans[index])
							 | 
						|||
| 
								 | 
							
										}(index, shard)
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
									wg.Wait()
							 | 
						|||
| 
								 | 
							
									return chans
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// fanIn reads elements from channels `chans` into channel `out`
							 | 
						|||
| 
								 | 
							
								// 从通道' chans '读取元素到通道' out '
							 | 
						|||
| 
								 | 
							
								func fanIn(chans []chan Tuple, out chan Tuple) {
							 | 
						|||
| 
								 | 
							
									wg := sync.WaitGroup{}
							 | 
						|||
| 
								 | 
							
									wg.Add(len(chans))
							 | 
						|||
| 
								 | 
							
									for _, ch := range chans {
							 | 
						|||
| 
								 | 
							
										go func(ch chan Tuple) {
							 | 
						|||
| 
								 | 
							
											for t := range ch {
							 | 
						|||
| 
								 | 
							
												out <- t
							 | 
						|||
| 
								 | 
							
											}
							 | 
						|||
| 
								 | 
							
											wg.Done()
							 | 
						|||
| 
								 | 
							
										}(ch)
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
									wg.Wait()
							 | 
						|||
| 
								 | 
							
									close(out)
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// Items returns all items as map[string]interface{}
							 | 
						|||
| 
								 | 
							
								// 返回所有条目作为map[string]interface{}
							 | 
						|||
| 
								 | 
							
								func (m ConcurrentMap) Items() map[string]interface{} {
							 | 
						|||
| 
								 | 
							
									tmp := make(map[string]interface{})
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									// Insert items to temporary map. 向临时映射中插入项目。
							 | 
						|||
| 
								 | 
							
									for item := range m.IterBuffered() {
							 | 
						|||
| 
								 | 
							
										tmp[item.Key] = item.Val
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									return tmp
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// IterCb Iterator callback,called for every key,value found in
							 | 
						|||
| 
								 | 
							
								// maps. RLock is held for all calls for a given shard
							 | 
						|||
| 
								 | 
							
								// therefore callback sess consistent view of a shard,
							 | 
						|||
| 
								 | 
							
								// but not across the shards
							 | 
						|||
| 
								 | 
							
								// 迭代器回调函数,在map中找到的每个键和值都会被调用。
							 | 
						|||
| 
								 | 
							
								// RLock对给定分片的所有调用都保持,因此回调获得一个分片的一致视图,但不跨分片
							 | 
						|||
| 
								 | 
							
								type IterCb func(key string, v interface{})
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// IterCb Callback based iterator, cheapest way to read all elements in a map.
							 | 
						|||
| 
								 | 
							
								// 基于回调的迭代器,读取映射中所有元素的最便宜方法。
							 | 
						|||
| 
								 | 
							
								func (m ConcurrentMap) IterCb(fn IterCb) {
							 | 
						|||
| 
								 | 
							
									for idx := range m {
							 | 
						|||
| 
								 | 
							
										shard := (m)[idx]
							 | 
						|||
| 
								 | 
							
										shard.RLock()
							 | 
						|||
| 
								 | 
							
										for key, value := range shard.items {
							 | 
						|||
| 
								 | 
							
											fn(key, value)
							 | 
						|||
| 
								 | 
							
										}
							 | 
						|||
| 
								 | 
							
										shard.RUnlock()
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// Keys returns all keys as []string
							 | 
						|||
| 
								 | 
							
								// 返回所有键为[]字符串
							 | 
						|||
| 
								 | 
							
								func (m ConcurrentMap) Keys() []string {
							 | 
						|||
| 
								 | 
							
									count := m.Count()
							 | 
						|||
| 
								 | 
							
									ch := make(chan string, count)
							 | 
						|||
| 
								 | 
							
									go func() {
							 | 
						|||
| 
								 | 
							
										// Foreach shard.
							 | 
						|||
| 
								 | 
							
										wg := sync.WaitGroup{}
							 | 
						|||
| 
								 | 
							
										wg.Add(ShardCount)
							 | 
						|||
| 
								 | 
							
										for _, shard := range m {
							 | 
						|||
| 
								 | 
							
											go func(shard *ConcurrentMapShared) {
							 | 
						|||
| 
								 | 
							
												// Foreach key, value pair.
							 | 
						|||
| 
								 | 
							
												shard.RLock()
							 | 
						|||
| 
								 | 
							
												for key := range shard.items {
							 | 
						|||
| 
								 | 
							
													ch <- key
							 | 
						|||
| 
								 | 
							
												}
							 | 
						|||
| 
								 | 
							
												shard.RUnlock()
							 | 
						|||
| 
								 | 
							
												wg.Done()
							 | 
						|||
| 
								 | 
							
											}(shard)
							 | 
						|||
| 
								 | 
							
										}
							 | 
						|||
| 
								 | 
							
										wg.Wait()
							 | 
						|||
| 
								 | 
							
										close(ch)
							 | 
						|||
| 
								 | 
							
									}()
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
									// Generate keys
							 | 
						|||
| 
								 | 
							
									keys := make([]string, 0, count)
							 | 
						|||
| 
								 | 
							
									for k := range ch {
							 | 
						|||
| 
								 | 
							
										keys = append(keys, k)
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
									return keys
							 | 
						|||
| 
								 | 
							
								}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								// MarshalJSON Reviles ConcurrentMap "private" variables to json marshal.
							 | 
						|||
| 
								 | 
							
								// 将存储的所有数据json序列化输出
							 | 
						|||
| 
								 | 
							
								func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
							 | 
						|||
| 
								 | 
							
									tmp := make(map[string]interface{})
							 | 
						|||
| 
								 | 
							
									for item := range m.IterBuffered() {
							 | 
						|||
| 
								 | 
							
										tmp[item.Key] = item.Val
							 | 
						|||
| 
								 | 
							
									}
							 | 
						|||
| 
								 | 
							
									return sonic.Marshal(tmp)
							 | 
						|||
| 
								 | 
							
								}
							 |