Go 并发

date
Apr 18, 2021
slug
30
status
Published
tags
Go
summary
type
Post

函数

函数编写的顺序是不重要的.
func sum(a int, b int) (int){
    return a + b
}
func main() {
    fmt.Println(sum(1,2))
}
可变参数
... 这个语法糖必须得是最后一个参数
// 使用 ...类型,表示一个元素为int类型的
如果多个类型不一致可以用interface{} 实现
import "fmt"
func MyPrintf(args ...interface{}) {
    for _, arg := range args {
        switch arg.(type) {
            case int:
                fmt.Println(arg, "is an int value.")
            case string:
                fmt.Println(arg, "is a string value.")
            case int64:
                fmt.Println(arg, "is an int64 value.")
            default:
                fmt.Println(arg, "is an unknown type.")
        }
    }
}

func main() {
    var v1 int = 1
    var v2 int64 = 234
    var v3 string = "hello"
    var v4 float32 = 1.234
    MyPrintf(v1, v2, v3, v4)
}
多个参数赋值给别个函数
import "fmt"

func sum(args ...int) int {
    var result int
    for _, v := range args {
        result += v
    }
    return result
}

func Sum(args ...int) int {
    // 这里这里这里这里这里这里这里
    result := sum(args...)
    return result
}
func main() {
    fmt.Println(Sum(1, 2, 3))
}
返回值可有可无,看定义. return 后面也可有可无, 看定义. 返回值也可以有多个, 看定义.
func double(a int) (int, int) {
 b := a * 2
 return a, b
}
func main() {
    // 接收参数用逗号分隔
 a, b := double(2)
 fmt.Println(a, b)
}
返回值可以是一个变量, 这样就不用 return 特别注明了.
func double(a int) (b int) {
    // 不能使用 := ,因为在返回值哪里已经声明了为int
		 b = a * 2
    // 不需要指明写回哪个变量,在返回值类型那里已经指定了
 return
}
func main() {
 fmt.Println(double(2))
}
// output: 4

匿名函数

func(data int) {
    fmt.Println("hello", data)
}(100)
// ------------------------------------------------
// 第二个参数为函数
func visit(list []int, f func(int)) {
    for _, v := range list {
        // 执行回调函数
        f(v)
    }
}
func main() {
    // 使用匿名函数直接做为参数
    visit([]int{1, 2, 3, 4}, func(v int) {
        fmt.Println(v)
    })
}

// 匿名函数赋值变量
package main

import (
    "fmt"
    "math"
)

func main() {
    getSqrt := func(a float64) float64 {
        return math.Sqrt(a)
    }
    fmt.Println(getSqrt(4))
}

runtime包

runtime.Gosched()
让出时间片,放弃当前goroutine
runtime.Goexit()
退出当前协程
runtime.GOMAXPROCS()
配置多少个线程执行多少个协程, 默认CPU核数

goroutine

channel

var pipline chan int
pipline = make(chan int, 0)
//或者
pipline := make(chan int, 0)

// 往信道中发送数据
pipline<- 200
// 从信道中取出数据,并赋值给mydata
mydata := <-pipline

// 关闭
close(pipline)
关闭channel之后, 可以取数据,但是数据恒为零值. 已经关闭channel之后,二次关闭将报错.
  1. 对一个关闭的通道再发送值就会导致panic
  1. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  1. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  1. 关闭一个已经关闭的通道会导致panic。
  1. channel可以被垃圾回收,所以不需要每次都手动关闭,看需求
判断channel是否关闭
x, ok := <-pipline
获取channel 容量和长度
package main

import "fmt"

func main() {
    pipline := make(chan int, 10)
    fmt.Printf("信道可缓冲 %d 个数据\n", cap(pipline)) // 容量
    pipline<- 1
    fmt.Printf("信道中当前有 %d 个数据", len(pipline)) // 长度
}
无缓冲通道, 发送和读取都会阻塞.
单向信道, 默认是双向的. 通常做法是定义一个单向通道然后将双向通道复制给单向通道,否则单向通道无法使用.
func counter(out chan<- int) {
    for i := 0; i < 100; i++ {
        out <- i
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}
func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

例子

协程池
package main

import (
	"fmt"
	"math/rand"
)

type Job struct {
	// id
	Id int
	// 需要计算的随机数
	RandNum int
}

type Result struct {
	// 这里必须传对象实例
	job *Job
	// 求和
	sum int
}

func main() {
	// 需要2个管道
	// 1.job管道
	jobChan := make(chan *Job, 128)
	// 2.结果管道
	resultChan := make(chan *Result, 128)
	// 3.创建工作池
	createPool(64, jobChan, resultChan)
	// 4.开个打印的协程
	go func(resultChan chan *Result) {
		// 遍历结果管道打印
		for result := range resultChan {
			fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
				result.job.RandNum, result.sum)
		}
	}(resultChan)
	var id int
	// 循环创建job,输入到管道
	for {
		id++
		// 生成随机数
		r_num := rand.Int()
		job := &Job{
			Id:      id,
			RandNum: r_num,
		}
		jobChan <- job
	}
}

// 创建工作池
// 参数1:开几个协程
func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
	// 根据开协程个数,去跑运行
	for i := 0; i < num; i++ {
		go func(jobChan chan *Job, resultChan chan *Result) {
			// 执行运算
			// 遍历job管道所有数据,进行相加
			for job := range jobChan {
				// 随机数接过来
				r_num := job.RandNum
				// 随机数每一位相加
				// 定义返回值
				var sum int
				for r_num != 0 {
					tmp := r_num % 10
					sum += tmp
					r_num /= 10
				}
				// 想要的结果是Result
				r := &Result{
					job: job,
					sum: sum,
				}
				//运算结果扔到管道
				resultChan <- r
			}
		}(jobChan, resultChan)
	}
}
Timer:时间到了,执行只执行1次
Ticker:时间到了,多次执行

select

和switch类似,但是作用域chan, default 也可以没有, 通常没有?
select {
    case <-chan1:
       // 如果chan1成功读到数据,则进行该case处理语句
    case chan2 <- 1:
       // 如果成功向chan2写入数据,则进行该case处理语句
    default:
       // 如果上面都没有成功,则进入default处理流程
    }

锁/ 同步

虽然数据通信可以用chan,但是多个channel还是不可避免的会遇到数据共享和同步问题. 所以锁还是有用的. 锁的原理就是无缓冲chan实现, 有sync包提供.
var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i := 0; i < 5000; i++ {
        lock.Lock() // 加锁
        x = x + 1
        lock.Unlock() // 解锁
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

Sync

sync.WaitGroup 类似 python中的wait
 
var wg sync.WaitGroup

func hello() {
    defer wg.Done()
    fmt.Println("Hello Goroutine!")
}
func main() {
    wg.Add(1)
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    wg.Wait()
}
sync.Once
func (o *Once) Do(f func()) {}
var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
    icons = map[string]image.Image{
        "left":  loadIcon("left.png"),
        "up":    loadIcon("up.png"),
        "right": loadIcon("right.png"),
        "down":  loadIcon("down.png"),
    } // 并发不安全
}

// Icon 是并发安全的
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons) // 这里就变成安全的了,而且还有缓存, 只执行一次
    return icons[name]
}
sync.Map
并发安全的Map
var m = sync.Map{}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            m.Store(key, n)
            value, _ := m.Load(key)
            fmt.Printf("k=:%v,v:=%v\n", key, value)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

GMP原理

 
notion image
 
Processor,它包含了运行 goroutine 的资源,如果线程想运行 goroutine,必须先获取 P,P 中还包含了可运行的 G 队列。
 
notion image
  • 全局队列(Global Queue):存放等待运行的 G。
  • P 的本地队列:同全局队列类似,存放的也是等待运行的 G,存的数量有限,不超过 256 个。新建 G’时,G’优先加入到 P 的本地队列,如果队列满了,则会把本地队列中一半的 G 移动到全局队列。
  • P 列表:所有的 P 都在程序启动时创建,并保存在数组中,最多有 GOMAXPROCS(可配置) 个。
  • M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,P 队列为空时,M 也会尝试从全局队列拿一批 G 放到 P 的本地队列,或从其他 P 的本地队列偷一半放到自己 P 的本地队列。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
  • M 与 P 的数量没有绝对关系.
  • P由GOMAXPROCS决定.
  • M是根据运行时自动创建销毁, 最多10000个. 但是10000个系统扛不住.所以约等于没有限制.
 

调度器策略

work stealing 机制
当本线程无可运行的 G 时,尝试从其他线程绑定的 P 偷取 G,而不是销毁线程。
hand off 机制
当本线程因为 G 进行系统调用阻塞时,线程释放绑定的 P,把 P 转移给其他空闲的线程执行。
抢占:在 coroutine 中要等待一个协程主动让出 CPU 才执行下一个协程,在 Go 中,一个 goroutine 最多占用 CPU 10ms,防止其他 goroutine 被饿死,这就是 goroutine 不同于 coroutine 的一个地方。
全局G队列 != 所有P的队列的G的和

go func () 调度流程

 
notion image
1、我们通过 go func () 来创建一个 goroutine;
2、有两个存储 G 的队列,一个是局部调度器 P 的本地队列、一个是全局 G 队列。新创建的 G 会先保存在 P 的本地队列中,如果 P 的本地队列已经满了就会保存在全局的队列中;
3、G 只能运行在 M 中,一个 M 必须持有一个 P,M 与 P 是 1:1 的关系。M 会从 P 的本地队列弹出一个可执行状态的 G 来执行,如果 P 的本地队列为空,就会想其他的 MP 组合偷取一个可执行的 G 来执行;
4、一个 M 调度 G 执行的过程是一个循环机制;
5、当 M 执行某一个 G 时候如果发生了 syscall 或则其余阻塞操作,M 会阻塞,如果当前有一些 G 在执行,runtime 会把这个线程 M 从 P 中摘除 (detach),然后再创建一个新的操作系统的线程 (如果有空闲的线程可用就复用空闲线程) 来服务于这个 P;
6、当 M 系统调用结束时候,这个 G 会尝试获取一个空闲的 P 执行,并放入到这个 P 的本地队列。如果获取不到 P,那么这个线程 M 变成休眠状态, 加入到空闲线程中,然后这个 G 会被放入全局队列中。
 
 

© chaleaoch 2021