go语言并发

作者: adm 分类: go 发布时间: 2023-04-05

go语言并发

//启动多个groutine
 
var wg sync.WaitGroup  //一个计数器一样的东西,用来统计gorountine的启动数量.
 
func hello(i int) {
	defer wg.Done()
	fmt.Println("hello Goroutine!", i)
 
}
func main() {
	for i := 0; i < 10; i++ {  //启动9 个groutine 
		wg.Add(1) 没循环一次就+1个计数
		go hello(i)  //gorountine 协成启动主体.
	}
	wg.Wait() //只有这个位置等于0 的时候才能继续向下执行剩余代码.
}

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup
hello Goroutine! 9
hello Goroutine! 1
hello Goroutine! 5
hello Goroutine! 3
hello Goroutine! 4
hello Goroutine! 7
hello Goroutine! 0
hello Goroutine! 6
hello Goroutine! 8
hello Goroutine! 2
 

说明一下:
上面的代码执行多次输出结果是不一样的.因为虽然gorountine在一个循环体内,但是启动的时机并不是按照顺序进行启动的.所以最终打印的状态是乱序的.
指定逻辑核心数

//指定逻辑核心数.
func a() {
	for i := 0; i < 10; i++ {
		fmt.Println("A:", i)
	}
}
func b() {
	for i := 1; i < 10; i++ {
		fmt.Println("B:", i)
	}
}
 
func main() {
	runtime.GOMAXPROCS(2)  //在goroutine启动前设置逻辑线程数量.
	go a()
	go b()
	time.Sleep(time.Second * 20)
}

输出结果:

B: 2
A: 1
B: 3
A: 2
B: 4
A: 3
B: 5
A: 4
B: 6
A: 5
B: 7
A: 6
B: 8
A: 7
B: 9
A: 8
A: 9

这里说明一下,如果cpu运行足够快的话那么即使指定了2个逻辑核心也不一定会将两个gorountine调度到两个逻辑核心上.因为协程调度也需要时间.如果执行时间小于调度时间,那么goroutine就不会做调度操作.

channel的使用

//channel
 
func recv(ch chan int) {
	ret := <-ch
	fmt.Println(ret)
}
 
//无缓冲通道
func main() {
	ch := make(chan int)
	ch <- 10    //
	go recv(ch) //这里说明一下.刚刚发现一个问题,虽然gorountine启动时无序的.但是在main函数中是有一定顺序的,
	// 比如 假设先给ch进行赋值操作,那么如果没有缓冲区的话在赋值的那一刻chan是无法将这个量装进ch里面的.
	//所以在使用无缓冲区的ch的时候要先1启动goroutine.有接受者在才能消费传进来的数据,但是这个问题在有缓冲通道的情况下可以随意.
 
	fmt.Println("发送成功")
 
}
 

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup
fatal error: all goroutines are asleep - deadlock!
 
goroutine 1 [chan send]:
main.main()
        D:/Go/Go/src/code.oldboy.com/studygolang/05lesson5/xueshengguanlixit/mai
n.go:50 +0x65

第二次调整一下gorountine和ch的位置.

func recv(ch chan int) {
	ret := <-ch
	fmt.Println(ret)
}
 
//无缓冲通道
func main() {
	ch := make(chan int)
	//
	go recv(ch) 
	ch <- 10
	fmt.Println("发送成功")
}

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup
10

发送成功

或者声明ch的时候创建才有换成区的ch

这里说明一下.
刚刚发现一个问题,虽然gorountine启动时无序的.但是在main函数中是有一定顺序的,
比如 假设先给ch进行赋值操作,那么如果没有缓冲区的话在赋值的那一刻chan是无法将这个量装进ch里面的.
所以在使用无缓冲区的ch的时候要先1启动goroutine.有接受者在才能消费传进来的数据,但是这个问题在有缓冲通道的情况下可以随意.

有缓冲的ch通道

有缓冲的通道会将传到ch中的数据暂时存起来,等到后续的gorountine进行调用,但是如果ch缓冲通道满了那么就无法再向通道中发送数据.那么这时候就会出现deadlock

func recv(ch chan int) {
	ret := <-ch
	fmt.Println(ret)
}
 
//无缓冲通道
func main() {
	ch := make(chan int, 1)
	ch <- 10
	ch <- 22
	go recv(ch)
	fmt.Println("发送成功")
}

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup
fatal error: all goroutines are asleep - deadlock!
 
goroutine 1 [chan send]:
main.main()
        D:/Go/Go/src/code.oldboy.com/studygolang/05lesson5/xueshengguanlixit/mai
n.go:51 +0x7f

如果将初始的ch缓冲值增加就可以解决死锁的问题.

func recv(ch chan int) {
	ret := <-ch
	fmt.Println(ret)
}
 
//无缓冲通道
func main() {
	ch := make(chan int, 2) //只要大于发送数量都可以.
	ch <- 10
	ch <- 22
	go recv(ch)
	fmt.Println("发送成功")
}
 

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup

发送成功

这个过程就好比小区快递柜.有一定的容量限制.也可以向内部放数据,但是只要数据放满了就需要有人取出最早方的数据才可以继续往里放.

循环取值

//循环取值

func main() {
	ch1 := make(chan int) //声明一个ch1无缓冲通道
	ch2 := make(chan int) //声明一个ch2无缓冲通道
	go func() {           //创建一个goroutine匿名函数,并循环向ch1中写入数字
		for i := 0; i < 100; i++ {
			ch1 <- i
		}
		close(ch1) //数据发送完成后关闭ch1通道
	}()
	go func() { //启动一个匿名goroutine并且不断判断ch1中是否还有值,如果没有值就跳出循环.如果优质的话想ch2输出数值的平方
		for {
			i, ok := <-ch1
			if !ok {
				break
			}
			ch2 <- i * i
		}
		close(ch2) //ch2循环取值完成后关闭ch2通道
	}()
	for i := range ch2 { //这里注意一下,当range在遍历通道的时候只会输出通道中的数据.没有其他值.这里输出额是i的平方
		fmt.Println(i)
	}
}
 

输出结果:

9025
9216
9409
9604
9801

单向channel

//单向通道
 
func counter(ch1 chan<- int) {  //单向通道,只能往ch1里面写
	for i := 0; i < 100; i++ {
		ch1 <- i 
	}
	close(ch1) //关闭通道
}
func squarer(ch2 chan<- int, ch1 <-chan int) {  //从ch1往ch2里面写数据.
	for i := range ch1 {
		ch2 <- i * i
	}
	close(ch2) //这里说明下,貌似只有在函数中显示的使用到的chan才需要进行关闭,如果没有显示的使用通道的话,那么就不用进行关闭操作,这里in就不需要进行关闭.
}
 
func printer(ch2 <-chan int) {  //同样是单向通道,从chan里面向ch2里面写入数据.
	for i := range ch2 {
		fmt.Println(i)
	}
 
}
 
func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go counter(ch1)  //与下面的gorouting几乎同时执行.
	go squarer(ch2, ch1) //从ch1中向ch2中传递数据
	printer(ch2) //从ch2中消费数据,
}
 

输出结果:

9216
9409
9604
9801

说明一下,这里我使用的是形参实参都是相同的名称,为了好区分.

说明一下:

关闭已经关闭的channel也会引发panic.

select多路复用

//select 多路复用
func main() {
	ch := make(chan int, 1)
	for i := 0; i < 10; i++ {
		time.Sleep(time.Second)
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
		}
	}
}

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup

因为ch通道只有1个缓冲,所以每次只能写进去一个值,当通道被占用的时候其他的值就写不进通道,那么结果就只能是打印被通道接受到的值.即使有default语句但是如果前面有任意一条满足要求了,那么default就不会执行.

并发安全和锁

//并发安全和锁
var x int64
var wg sync.WaitGroup
 
func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}
 
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
 
}
 

运行结果:
说明一下这里的结果是不确定的.因为在代码中启动了两个goroutine这两个存在数据竞争关系,即一个调用直接影响另一个的值.所以这里就不给具体结果了.

互斥锁

//互斥锁
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
 
func add() {
	for i := 0; i < 5000; i++ {
		lock.Lock()   //函数内调用加锁
		x += 1        //执行过程
		lock.Unlock() //函数调用解锁
	}
	wg.Done() //函数调用完 -1 需要在执行体外进行.
 
}
 
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
 
}
 

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup
10000
 

说明一下:
这里在执行过程中添加了互斥锁.在函数调用过程中不存在数据竞争的情况.因为只要某一个时刻有一个函数在调用x那么其他的调用就需要等待.只有等待解锁后才可以对x进行操作.

读写锁

var (
	x      int64
	wg     sync.WaitGroup
	lock   sync.Locker
	rwlock sync.RWMutex
)
 
func write() {
	//添加写锁
	rwlock.Lock()
	x = x + 1
	time.Sleep(time.Millisecond * 10) //假设写操作耗时10毫秒
	//解写锁
	rwlock.Unlock()
	wg.Done()
}
 
func read() {
	rwlock.RLock()               //添加读锁
	time.Sleep(time.Millisecond) //假设读操作为1毫秒
	fmt.Println(x)
	rwlock.RUnlock() //解读锁
	defer wg.Done()
}
 
func main() {
	start := time.Now()
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go read()
	}
	wg.Wait()
	end := time.Since(start)
	fmt.Println(end)
}
 

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup

读写锁适合读多写少的场景.如果读写比例比较接近那么就很难发挥其优势.

sync.waitgroup

(wg * WaitGroup) Add(delta int)	计数器+delta
(wg *WaitGroup) Done()	计数器-1
(wg *WaitGroup) Wait()	阻塞直到计数器变为0

代码

var wg sync.WaitGroup
 
func hello() {
	defer wg.Done() //这里使用defer 在函数结束之前进行wg计数器减操作
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(1)  //这里预先指定好添加的goroutine数量
	go hello() // 启动另外一个goroutine去执行hello函数
	fmt.Println("main goroutine done!")
	wg.Wait() //计数清零才会向下执行其他代码
}
 

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup
main goroutine done!
Hello Goroutine!
 

这里在函数中使用defer配合wg.waitgroup相关方法

使用sync.once包加载一次配置信息

var icons map[string]image.Image
var loadIconsOnce sync.Once
 
func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}
 
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons) //这里就指明了使用sync.once加载一次loadicons
	return icons[name] //返回icons[name]
}
 

这里使用的是修改配置信息.

并发安全的sync.map

func main() {
	var testMap sync.Map
 
	// 获取a对应的键值应该失败
	valueA, ok := testMap.Load("a")   //直接获取test.Map中a键的值
	if ok {
		fmt.Println("a", valueA)
	} else {
		fmt.Println("a no value") // true
	}
 
	// 设置a对应的键值,然后再获取
	testMap.Store("a", "ok1")  //是通store设置键值对 test.store
	valueA, ok = testMap.Load("a")  //从testMap中获取键值
	if ok {
		fmt.Println("a", valueA) // true
	} else {
		fmt.Println("a no value")
	}
 
	// 设置a对应的键值,然后再获取
	testMap.Store("a", "ok2") //设置键值 a ok2
	valueA, ok = testMap.Load("a") //获取a键的值
	if ok {
		fmt.Println("a", valueA) // true
	} else {
		fmt.Println("a no value")
	}
 
	// 设置a对应的键值,然后再获取
	// 若a存在对应键值,则只是获取,不设置新值
	valueA, ok = testMap.LoadOrStore("a", "ok3") //如果存在就获取,如果不存在就设置
	if ok {
		fmt.Println("a", valueA) // true
	} else {
		fmt.Println("a no value")
	}
	valueA, ok = testMap.Load("a") //获取键值
	if ok {
		fmt.Println("a", valueA) // true
	} else {
		fmt.Println("a no value")
	}
	// 若a不存在对应键值,则进行设置
	valueB, ok := testMap.LoadOrStore("b", "ok3")  //如果不存在对应键值就进行设置,存在就获取.
	if ok {
		fmt.Println("b", valueB)
	} else {
		fmt.Println("b no value") // true
	}
	valueB, ok = testMap.Load("b")
	if ok {
		fmt.Println("b", valueB) // true
	} else {
		fmt.Println("b no value")
	}
 
	// 遍历
	testMap.Range(func(key, value interface{}) bool {
		keyInfo := key.(string)
		valueInfo := value.(string)
		fmt.Println("keyInfo", keyInfo, "valueInfo", valueInfo)
		return true
	})
 
	// 删除
	testMap.Delete("a")
}
 

输出结果

C:\Users\34826\AppData\Local\Temp\___go_build_main_go__1_.exe #gosetup
a no value
a ok1
a ok2
a ok2
a ok2
b no value
b ok3
keyInfo a valueInfo ok2
keyInfo b valueInfo ok3
 

atmic

type Counter interface {
	Inc()
	Load() int64
}
 
// 普通版
type CommonCounter struct {
	counter int64
}
 
func (c CommonCounter) Inc() {
	c.counter++
}
 
func (c CommonCounter) Load() int64 {
	return c.counter
}
 
// 互斥锁版
type MutexCounter struct {
	counter int64
	lock    sync.Mutex //添加一个互斥锁到结构体中.用来调用执行过程中前后的锁.
}
 
func (m *MutexCounter) Inc() {
	m.lock.Lock()
	defer m.lock.Unlock()
	m.counter++
}
 
func (m *MutexCounter) Load() int64 {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.counter
}
 
// 原子操作版
type AtomicCounter struct {
	counter int64
}
 
func (a *AtomicCounter) Inc() {
	atomic.AddInt64(&a.counter, 1)
}
 
func (a *AtomicCounter) Load() int64 {
	return atomic.LoadInt64(&a.counter)
}
 
func test(c Counter) {
	var wg sync.WaitGroup
	start := time.Now()
	for i := 0; i < 10000000; i++ {
		wg.Add(1)
		go func() {
			c.Inc()
			wg.Done()
		}()
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}
 
func main() {
	c1 := CommonCounter{} // 非并发安全
	test(c1)
	c2 := MutexCounter{} // 使用互斥锁实现并发安全
	test(&c2)
	c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高 我自己策略一下,其实自己加锁和使用atomic加锁效率差不多.一千万次执行相差0.03秒.第二次执行相差0.01秒.
	test(&c3)
}

输出结果

0 2.8134764s  //结果错误
10000000 2.8563644s //结果正确
10000000 2.8244477s //结果正确
和
0 2.788545s
10000000 2.865339s
10000000 2.8553674s

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!