并发是指在同一时间内可以执行多个任务

Go语言通过编译器运行时(runtime),从语言上支持了并发的特性,通过goroutine特性完成并发。goroutine由Go语言的运行时调度完成,而线程是通过操作系统调度完成。

一、轻量级线程(goroutine)-根据需要随时创建的“线程”

Go程序从main包的main()函数开始,就会为mian()函数创建一个默认goroutine

1、使用普通函数创建goroutine

使用go关键字,将running()函数并发执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func running() {
var times int
// 构建一个无限循环
for {
times++
fmt.Println("tick", times)
// 延迟一秒
time.Sleep(time.Second)
}
}

func main() {
// 并发执行程序
go running()
// 接收命令行输入,不做任何事
var input string
fmt.Scanln(&input)
}

2、使用匿名函数创建goroutine

在main中创建一个匿名函数并为匿名函数启动goroutine

1
2
3
4
5
6
7
8
9
10
func main() {
go func() {
var times int
for {
times++;
fmt.Println("tick", times)
time.Sleep(time.Second)
}
}()
}

所有goroutine函数都会在main()函数结束以后一同结束

3、调整并发的运行性能(GOMAXPROCS)

一般情况下可以使用runtime.numCPU()查询cpu数量,并使用runtime.GOMAXPROCS()函数进行设置

4、并发和并行

  • 并发(concurrency):将任务在不同的时间点交给处理器进行处理
  • 并行(parallelism):将任务分配给每个单独的处理器独立完成

5、Go中的协程(goroutine)和普通协程(coroutine)

  • goroutine可能发生并行操作,coroutine始终顺序执行
  • goroutine见使用channel通信,coroutine使用yieId和resume操作

二、通道(channel)-在多个goroutine间的通信管道

Go语言提倡使用通信的方式代替共享内存,就是所谓的通道(channel)

1、通道的特性

任何时候,只能有一个goroutine进行发送和获取数据

2、通道的使用:并发打印数字

创建一个无缓冲通道,使用无缓冲通道时,装入方将被阻塞,直到数据在另一个goroutine被取出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func printer(c chan int) {
// 开始无限循环等待数据
for {
// 从channel中获取一个数据
data := <-c
// 将0视为结束
if data == 0 {
break
}
// 打印数据
fmt.Println(data)
}
// 通知main函数已经循环结束
c <- 0
}

func main() {
// 创建一个channel
c := make(chan int)
// 并发执行printer,传入channel
go printer(c)
for i := 1; i <= 10; i++ {
// 通过channel将数据传给printer
c <- i
}
// 通知并发的printer结束循环等待
c <- 0
// 等待printer结束
<-c
}

3、单向通道-通道中的单行道

Go通道可以在声明时约束其操作方向,如只发送或接收

1
2
3
4
5
6
7
ch := make(chan int)
// 声明一个只能发送的通道类型,并赋值未ch
var chSendOnly chan<- int = ch
// 声明一个只能接受的通道类型
var chRecvOnly <-chan int = ch
chSendOnly <- 0
<- chRecvOnly

4、带缓冲的通道

带缓冲的通道无需等待接收方接收即可完成发送过程,并且不会阻塞,只有存储空间满的时候才会发生阻塞

1
2
3
4
5
6
7
8
9
10
// 创建一个3个元素缓冲区大小的整形通道
ch := make(chan int, 3)
// 查看当前通道的大小
fmt.Println(len(ch))
// 发送三个整形元素到通道
ch <- 1
ch <- 2
ch <- 3
// 查看当前通道的大小
fmt.Println(len(ch))

阻塞条件:

  • 带缓冲通道满继续发送数据
  • 带缓冲通道空继续接收数据

Go语言对通道限制长度的原因

一方生产数据,一方消费数据,当提供数据一方速度大于消费数据一方速度时候,若不限制长度,会撑爆内存

5、模拟远程过程调用(RPC)

服务器开发会使用RPC(远程调用)简化通信过程,使得远程通信如同本地函数调用一样

(1)客户端请求和接收封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 模拟RPC客户端请求和接收
func RPCClient(ch chan string, req string) (string, error) {
// 向服务器发送请求
ch <- req
// 等待服务器返回
select {
// 接收到数据
case ack := <-ch:
return ack, nil
// 超时
case <-time.After(time.Second):
return "", errors.New("time Out")
}
}
  • 模拟socket向服务器发送一个字符串,服务器接收后结束阻塞执行下一行
  • 使用select进行多路复用
  • time.After(time.Second)通过这个返回的通道在指定时间后返回当前时间

两个case会同时执行,看哪个通道先返回数据

(2)服务器接收和返回数据

1
2
3
4
5
6
7
8
9
10
11
// 模拟RPC服务端接收客户端请求和相应
func RPCServer(ch chan string) {
for {
// 接收客户端请求
data := <-ch
// 打印接受的数据
fmt.Println("server received : ", data)
// 向客户端反馈已收到
ch <-"roger"
}
}

使用无限循环处理客户端请求

在main函数中分别调用两个函数,输出如下:

(3)模拟超时

在RPCServer中加入一段代码即可:time.Sleep(time.Second * 2),通过睡眠阻塞代码

四、同步-保证并发环境下数据访问的正确性

在某些轻量级场合,原子访问(atomic包)、互斥锁(sync.Mutex)以及等待组(sync.WaitGroup)能最大程度满足需求

1、竞态检测-检测代码并发环境下可能出现的问题

我们执行以下代码,就会发现错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var (
seq int64
)

func GenID() int64 {
// 尝试原子增加序号
atomic.AddInt64(&seq, 1)
return seq
}

func main() {
for i := 0; i < 10; i++ {
go GenID()
}
fmt.Println(GenID())
}

输出结果每次都不一样,原因是return seq有竞态问题,将代码改成如下所示就没问题了:return atomic.AddInt64(&seq, 1)

2、互斥锁(sync.Mutex)

使用锁非常简单

1
2
3
4
5
6
// 声明锁
var lock sync.Mutex
// 锁定
lock.Lock()
// 取消锁定
lock.Unlock()

加锁解锁过程保证原子性

3、读写互斥锁(sync.RWMutex)

在读多写少的环境中,可以使用读写互斥锁

基本使用类似

1
2
3
var lockr sync.RWMutex
lockr.RLock()
lockr.RLocker()

4、等待组(sync.WaitGroup)

保证并发环境中完成指定的任务数,等待组内部有一个计数器,计数器的值可以通过调用方法进行加减

直接上代码看演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
// 声明一个等待组
var wg sync.WaitGroup
// 准备一系列网站
var urls = []string{
"https://www.github.com",
"https://www.qiniu.com/",
"https://www.baidu.com/",
}
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
_, err := http.Get(url)
fmt.Println(url, err)
// 传递url
}(url)
}
wg.Wait()
fmt.Println("over")
}

所有的任务(网站响应)完成后,wait就会停止阻塞状态向下执行