go sync包 锁 chan 等

sync.Mutex

互斥锁 同一时间只能有一个线程进入

当你使用mutex时,确保mutex和其保护的变量没有被导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 
import "sync"

type Info struct {
	mu sync.Mutex
	// ... other fields, e.g.: Str string
}

func Update(info *Info) {
	info.mu.Lock()
    // critical section:
    info.Str = // new value
    // end critical section
    info.mu.Unlock()
}


sync.RWMutex读写锁

针对读写操作的互斥锁,它可以分别针对读操作和写操作进行锁定和解锁操作。读写锁遵循的访问控制规则与互斥锁有所不同。

使用情况多读单写 多个只读操作并行执行,但写操作会完全互斥。

chan

goroutine 轻量级线程

协程是轻量的,比线程更轻。它们痕迹非常不明显(使用少量的内存和资源):使用 4K 的栈内存就可以在堆中创建它们。因为创建非常廉价, 必要的时候可以轻松创建并运行大量的协程(在同一个地址空间中 100,000 个连续的协程)。 并且它们对栈进行了分割,从而动态的增加(或缩减)内存的使用;栈的管理是自动的,但不是由垃圾回收器管理的,而是在协程退出后自动释放。

goroutine 与其他协程不一样

  • Go 协程意味着并行(或者可以以并行的方式部署),协程一般来说不是这样的
  • Go 协程通过通道来通信;协程通过让出和恢复操作来通信

chan 数据共享

1
2
// 发送和接收操作在另一端准备好之前都会阻塞 同步且无缓冲
a := make(chan time.Duration)
1
2
ch <- v    // 将 v 发送至信道 ch。
v := <-ch  // 从 ch 接收值并赋予 v。

buffer

始化一个带缓冲的信道

仅当信道的缓冲区填满后,向其发送数据时才会阻塞。当缓冲区为空时,接受方会阻塞。

通道可以同时容纳的元素个数 缓冲容量和类型无关。

发生 panic 的情况有三种

  • 向一个关闭的 channel 进行写操作;
  • 关闭一个 nil 的 channel;
  • 重复关闭一个 channel。

读、写一个 nil channel 都会被阻塞。

带不带缓冲区别

带缓冲区的channel 写入阻塞条件:缓冲区满 取出阻塞条件:缓冲区没有数据

不带缓冲区的channel 写入阻塞条件:同一时间没有另外一个线程对该chan进行读操作 取出阻塞条件:同一时间没有另外一个线程对该chan进行取操作

close

  • 不改变 channel 自身状态的情况下,无法获知一个 channel 是否关闭
  • 关闭一个 closed channel 会导致 panic
  • 向一个 closed channel 发送数据会导致 panic

不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。

向 channel 发送元素的就是 sender,因此 sender 可以决定何时不发送数据,并且关闭 channel。但是如果有多个 sender,某个 sender 同样没法确定其他 sender 的情况,这时也不能贸然关闭 channel

如何优雅close

  • 1个 sender,1/M 个 receiver

    只有1个 sender 直接从 sender 端关闭就好了

  • N 个 sender,一个 reciver

    增加一个传递关闭信号的 channel(closeSender),receiver 关闭closeSender。senders 监听到关闭信号后,停止发送数据。优雅地关闭 channel 就是不关闭 channel,让 gc 代劳

  • N 个 sender, M 个 receiver

    第 n 个 send 一定 happened before 第 n 个 receive finished,无论是缓冲型还是非缓冲型的 channel。

    对于容量为 m 的缓冲型 channel,第 n 个 receive 一定 happened before 第 n+m 个 send finished

    对于非缓冲型的 channel,第 n 个 receive 一定 happened before 第 n 个 send finished

    channel close 一定 happened before receiver 得到通知。

chan原理

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利
  • 发送方会向缓冲区中写入数据,然后唤醒接收方,多个接收方会尝试从缓冲区中读取数据,如果没有读取到会重新陷入休眠;
  • 接收方会从缓冲区中读取数据,然后唤醒发送方,发送方会尝试向缓冲区写入数据,如果缓冲区已满会重新陷入休眠;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
type hchan struct {
	qcount   uint			//Channel 中的元素个数
	dataqsiz uint			// Channel 中的循环队列的长度
	buf      unsafe.Pointer //Channel 的缓冲区数据指针
	elemsize uint16
	closed   uint32
	elemtype *_type
	sendx    uint  //Channel 的发送操作处理到的位置
	recvx    uint  //Channel 的接收操作处理到的位置
	recvq    waitq
	sendq    waitq // 缓冲区空间不足而阻塞的 Goroutine 列表

	lock mutex
}

这些等待队列使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog 结构:

1
2
3
4
type waitq struct {
	first *sudog
	last  *sudog
}

发送数据

  • 当存在等待的接收者时,通过 runtime.send 直接将数据发送给阻塞的接收者;
  • 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区;
  • 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据;

有等待的接收者

  • 如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend 会从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据
  • 调用 runtime.sendDirect 将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上;
  • 调用 runtime.goready 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,程序没有立刻执行该 Goroutine

缓冲区并且 Channel 中的数据没有装满

  • 在这里我们首先会使用 runtime.chanbuf 计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove 将发送的数据拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器。
  • 因为这里的 buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会重新回到数组开始的位置。

没有接收者

  • 调用 runtime.getg 获取发送数据使用的 Goroutine;

  • 执行 runtime.acquireSudog 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 select 中和待发送数据的内存地址等;

  • 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪;

  • 调用 runtime.goparkunlock 将当前的 Goroutine 陷入沉睡等待唤醒;

  • 被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放 runtime.sudog 结构体;

  • 函数在最后会返回 true 表示这次我们已经成功向 Channel 发送了数据。

总结

  1. 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
  2. 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
  3. 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;

发送数据的过程中包含几个会触发 Goroutine 调度的时机:

  1. 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度;
  2. 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权;

接收数据

  • 当存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据;
  • 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据;
  • 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据

总结

  1. 如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
  2. 如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回;
  3. 如果 Channel 的 sendq 队列中存在挂起的 Goroutine,会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区;
  4. 如果 Channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据;
  5. 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;

我们总结一下从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:

  1. 当 Channel 为空时;
  2. 当缓冲区中不存在数据并且也不存在数据的发送者时;

select

select 会阻塞到某个分支可以继续执行为止,这时就会执行该分支。当多个分支都准备好时会随机选择一个执行。

当 select 中的其它分支都没有准备好时,default 分支就会执行。 为了在尝试发送或者接收时不发生阻塞,可使用 default 分支

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
CheckValidDuration = make(chan time.Duration)

go func() {
		CheckValidDuration <- GetInitTimeDuration()
	}()


func checkExpire() {
	timeDuration := GetInitTimeDuration()
	for {
		ok := license.Lcache.Valid()
		if !ok {
			log4go.Error("sensor license expire time")
		}
		select {
		case <-time.After(timeDuration):
		case x := <-license.CheckValidDuration:
			timeDuration = x
			log4go.Info("update timeDuration : %d", timeDuration)
		}
	}
}

死锁

  • 数据要发送,但是没有人接收
  • 数据要接收,但是没有人发送
1
2
3
a := make(chan int)
a <- 1   //将数据写入channel
z := <-a //从channel中读取数据
  • 解决办法

使用协程配对

1
2
3
4
5
6
chanInt := make(chan int)
go func() {
    chanInt <- 1
}()

res := <-chanInt

配对可以让死锁消失,但发送多个值的时候又无法配对了,又会死锁

改为缓冲通道

1
2
3
chanInt := make(chan int,1)
chanInt <- 2
res := <-chanInt

go 泄漏

  • 当一个 goroutine 完成了它的工作
  • 由于发生了没有处理的错误
  • 有其他的协程告诉它终止

goroutine 终止的场景有三个

阻塞,goroutine进入死循环也是泄露的原因

原子操作

mutex会阻塞其他goroutines,比原子操作慢

每次调用从sync/atomic包转换为一组特殊的机器指令,这些机器指令基本上在CPU级别上运行

1
2
3
4
5
6
7
8
9
10
11
type ThreatActionCountCache struct {
	Data  map[uint8]uint64
	Date  uint32
	Mutex sync.Mutex
}

func AddThreatActionCount(threatActionType uint8) {
	threatActionCache.Mutex.Lock()
	defer threatActionCache.Mutex.Unlock()
	threatActionCache.Data[threatActionType] += 1
}
1
2
3
4
5
6
7
8
9
var (
	trafficSizeTotal     uint64
	trafficSizeDateTotal uint64
)

func AddTrafficSize(size uint64) {
	atomic.AddUint64(&trafficSizeTotal, size)
	atomic.AddUint64(&trafficSizeDateTotal, size)
}

gc 调优 Pool

当多个 goroutine 都需要创建同⼀个对象的时候,如果 goroutine 数过多,导致对象的创建数⽬剧增,进⽽导致 GC 压⼒增大。形成 “并发⼤-占⽤内存⼤-GC 缓慢-处理并发能⼒降低-并发更⼤”这样的恶性循环,在这个时候,需要有⼀个对象池,每个 goroutine 不再⾃⼰单独创建对象,⽽是从对象池中获取出⼀个对象。

defer不能随便用

全局来看,它的损耗非常小,性能有大幅度提升,在go 1.14里用不用defer影响甚微

1
2
3
4
5
6
7
resp, err := http.Get(...)
if err != nil {
    return err
}
defer resp.Body.Close()
// do something
time.Sleep(time.Second * 60)

如果是这种代码,在保证无异常的情况下确保尽早关闭才是首选,一个请求当然没问题,流量、并发一下子大了呢,那可能就是个灾难了。

context

在Go 里,我们不能直接杀死协程,协程的关闭一般会用 channel+select 方式来控制。但是在某些场景下,例如处理一个请求衍生了很多协程,这些协程之间是相互关联的:需要共享一些全局变量、有共同的 deadline 等,而且可以同时被关闭。再用 channel+select 就会比较麻烦,这时就可以通过 context 来实现。

context 用来解决 goroutine 之间退出通知元数据传递的功能

1
2
3
4
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
  • 不要将 Context 塞到结构体里。直接将 Context 类型作为函数的第一参数,而且一般都命名为 ctx。

  • 不要向函数传入一个 nil 的 context,如果你实在不知道传什么,context.TODO 。

  • 不要把本应该作为函数参数的类型塞到 context 中,context 存储的应该是一些共同的数据。例如:登陆的 session、cookie 等。

  • 同一个 context 可能会被传递到多个 goroutine,别担心,context 是并发安全的。

1
func Background() Context

background 是一个空的 context, 它不能被取消,没有值,也没有超时时间。

传值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
    "context"
    "fmt"
)

func main() {
    ctx := context.Background()
    process(ctx)

    ctx = context.WithValue(ctx, "traceId", "qcrao-2019")
    process(ctx)
}

func process(ctx context.Context) {
    traceId, ok := ctx.Value("traceId").(string)
    if ok {
        fmt.Printf("process over. trace_id=%s\n", traceId)
    } else {
        fmt.Printf("process over. no trace_id\n")
    }
}

超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func Perform(ctx context.Context) {
	for {
		fmt.Println("")

		select {
		case <-ctx.Done():
			fmt.Println("ctx超时被取消,直接返回")
			return
		case <-time.After(time.Second):
			fmt.Println("block 1 秒钟")
		}
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	go ctx_test.Perform(ctx)

	time.Sleep(time.Second * 5)
	cancel()

}

WithTimeOut 函数返回的 context 和 cancelFun 是分开的。context 本身并没有取消函数,这样做的原因是取消函数只能由外层函数调用,防止子节点 context 调用取消函数。