Golang sync 包解析

Posted by Wang Gang on 2019-08-21

包的用途

主要用于在并发中(同步机制,原子操作和对象池)

原子操作 atomic.Value

使用的是汇编语言编写,在sync/atomic包中,
在sync.atomic包中,有两个函数,Value.Store()Value.Load()用于对内存中的数据进行原子的读和写,sync中的很多操作都是采用这样的方式对数据的于原子操作

Map 并发Map(1.9更新)

1
2
3
4
5
6
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}

方法

1
2
3
4
5
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {} //读取map中的值,ok反应是否存在key
func (m *Map) Store(key, value interface{}) {} //原子操作存取数据到map中
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {}
func (m *Map) Delete(key interface{}) {} //删除map中的key
func (m *Map) Range(f func(key, value interface{}) bool) {} //遍历sync.map中的数据
  • 在Go 1.9中sync.Map是怎么实现的呢?它是如何解决并发提升性能的呢?
  • sync.Map的实现有几个优化点,这里先列出来,我们后面慢慢分析。
  • 空间换时间。 通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。
  • 使用只读数据(read),避免读写冲突。
  • 动态调整,miss次数多了之后,将dirty数据提升为read。
  • double-checking。
  • 延迟删除。 删除一个键值只是打标记,只有在提升dirty的时候才清理删除的数据。
  • 优先从read读取、更新、删除,因为对read的读取不需要锁。

Mutex/RWMutex 读写锁

  • Mutex
1
2
3
4
type Mutex struct {
state int32
sema uint32
}

Lock()方法
* cas操作修改锁状态到锁定,如果锁是锁定状态,不能修改
* 则会进入自旋锁,忙等待锁的Unlock,由于自旋锁是有限制的,当到达上线后
* 就会通过信号量,收到信号量后才能被唤醒,锁定
Unlock()方法
* case操作修改锁状态到解锁,并且释放信号量

  • RWMutex
1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}

对象中有读(readerSem)和写(writerSem)两个信号量,相比Mutex,没了自旋锁的,直接获取信号量

ps:RWMutex主要应用在读多写少的场景中

Cond

1
2
3
4
5
6
type Cond struct {
noCopy noCopy //等待计数器
L Locker //锁
notify notifyList //信号量
checker copyChecker
}
  • 在等待一个goruntime执行完成后让使得另一个goruntime执行的话就可以使用cond这样的对象
    usage:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main
import (
"fmt"
"sync"
"time"
)
func main() {
cond := sync.NewCond(new(sync.Mutex))
go func(cd *sync.Cond) {
cd.L.Lock()
cd.Wait()
fmt.Println("after single...")
cd.L.Unlock()
}(cond)

time.Sleep(time.Second * 2)
cond.L.Lock()
cond.Signal()
fmt.Println("single...")
cond.L.Unlock()
time.Sleep(time.Second * 2)
}

WaitGroup

1
2
3
4
5
type WaitGroup struct {
noCopy noCopy
state1 [12]byte //计数器
sema uint32
}
  • 在多个goruntime执行完成后才能去执行操作使用
  • 必须等上一次使用中所有的Wait均已返回才行
  • 知道计数器值为0,才往下执行
    usage:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main
import (
"fmt"
"sync"
"time"
)
func main() {
wg := new(sync.WaitGroup)
for i := 0; i < 5; i++ {
go func(i int) {
wg.Add(1)
defer wg.Done()
time.Sleep(time.Second * 2)
fmt.Println("exec %d goruntime...", i)
}(i)
}
time.Sleep(time.Second)
wg.Wait()
fmt.Println("after all goruntime exec over....")
}

Once 判断函数是否执行(原子操作)

1
2
3
4
type Once struct {
m Mutex
done uint32
}
  • 如果函数是已经执行的函数,原子判断是否为done如果是的话,就会返回
  • 如果没有执行过,就会加锁后,对该函数进行执行(原子操作)

usage:

1
2
3
4
5
6
7
8
9
10
11
12
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
// Slow-path.
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}