Go语言全栈开发:并发(下)

绝地灬酷狼 2023-02-12 10:23 122阅读 0赞






并发安全和锁

有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。

举个例子:

  1. var x int64
  2. var wg sync.WaitGroup
  3. func add() {
  4. for i := 0; i < 5000; i++ {
  5. x = x + 1
  6. }
  7. wg.Done()
  8. }
  9. func main() {
  10. wg.Add(2)
  11. go add()
  12. go add()
  13. wg.Wait()
  14. fmt.Println(x)
  15. }

上面的代码中我们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符。

| 互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。 使用互斥锁来修复上面代码的问题:

  1. var x int64
  2. var wg sync.WaitGroup
  3. var lock sync.Mutex
  4. func add() {
  5. for i := 0; i < 5000; i++ {
  6. lock.Lock() // 加锁
  7. x = x + 1
  8. lock.Unlock() // 解锁
  9. }
  10. wg.Done()
  11. }
  12. func main() {
  13. wg.Add(2)
  14. go add()
  15. go add()
  16. wg.Wait()
  17. fmt.Println(x)
  18. }

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

| 读写互斥锁

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

读写锁示例:

  1. var (
  2. x int64
  3. wg sync.WaitGroup
  4. lock sync.Mutex
  5. rwlock sync.RWMutex
  6. )
  7. func write() {
  8. // lock.Lock() // 加互斥锁
  9. rwlock.Lock() // 加写锁
  10. x = x + 1
  11. time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
  12. rwlock.Unlock() // 解写锁
  13. // lock.Unlock() // 解互斥锁
  14. wg.Done()
  15. }
  16. func read() {
  17. // lock.Lock() // 加互斥锁
  18. rwlock.RLock() // 加读锁
  19. time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
  20. rwlock.RUnlock() // 解读锁
  21. // lock.Unlock() // 解互斥锁
  22. wg.Done()
  23. }
  24. func main() {
  25. start := time.Now()
  26. for i := 0; i < 10; i++ {
  27. wg.Add(1)
  28. go write()
  29. }
  30. for i := 0; i < 1000; i++ {
  31. wg.Add(1)
  32. go read()
  33. }
  34. wg.Wait()
  35. end := time.Now()
  36. fmt.Println(end.Sub(start))
  37. }

需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。

| sync.WaitGroup

在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:

在这里插入图片描述
sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。

我们利用sync.WaitGroup将上面的代码优化一下:

  1. var wg sync.WaitGroup
  2. func hello() {
  3. defer wg.Done()
  4. fmt.Println("Hello Goroutine!")
  5. }
  6. func main() {
  7. wg.Add(1)
  8. go hello() // 启动另外一个goroutine去执行hello函数
  9. fmt.Println("main goroutine done!")
  10. wg.Wait()
  11. }

需要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。

| sync.Once

说在前面的话:这是一个进阶知识点。

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。

Go语言中的sync包中提供了一个针对只执行一次场景的解决方案–sync.Once

sync.Once只有一个Do方法,其签名如下:

  1. func (o *Once) Do(f func()) { }

备注:如果要执行的函数f需要传递参数就需要搭配闭包来使用。

加载配置文件示例

延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。我们来看一个例子:

  1. var icons map[string]image.Image
  2. func loadIcons() {
  3. icons = map[string]image.Image{
  4. "left": loadIcon("left.png"),
  5. "up": loadIcon("up.png"),
  6. "right": loadIcon("right.png"),
  7. "down": loadIcon("down.png"),
  8. }
  9. }
  10. // Icon 被多个goroutine调用时不是并发安全的
  11. func Icon(name string) image.Image {
  12. if icons == nil {
  13. loadIcons()
  14. }
  15. return icons[name]
  16. }

多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个goroutine都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:

  1. func loadIcons() {
  2. icons = make(map[string]image.Image)
  3. icons["left"] = loadIcon("left.png")
  4. icons["up"] = loadIcon("up.png")
  5. icons["right"] = loadIcon("right.png")
  6. icons["down"] = loadIcon("down.png")
  7. }

在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其他的goroutine操作,但是这样做又会引发性能问题。

使用sync.Once改造的示例代码如下:

  1. var icons map[string]image.Image
  2. var loadIconsOnce sync.Once
  3. func loadIcons() {
  4. icons = map[string]image.Image{
  5. "left": loadIcon("left.png"),
  6. "up": loadIcon("up.png"),
  7. "right": loadIcon("right.png"),
  8. "down": loadIcon("down.png"),
  9. }
  10. }
  11. // Icon 是并发安全的
  12. func Icon(name string) image.Image {
  13. loadIconsOnce.Do(loadIcons)
  14. return icons[name]
  15. }

并发安全的单例模式

下面是借助sync.Once实现的并发安全的单例模式:

  1. package singleton
  2. import (
  3. "sync"
  4. )
  5. type singleton struct { }
  6. var instance *singleton
  7. var once sync.Once
  8. func GetInstance() *singleton {
  9. once.Do(func() {
  10. instance = &singleton{ }
  11. })
  12. return instance
  13. }

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

| sync.Map

Go语言中内置的map不是并发安全的。请看下面的示例:

  1. var m = make(map[string]int)
  2. func get(key string) int {
  3. return m[key]
  4. }
  5. func set(key string, value int) {
  6. m[key] = value
  7. }
  8. func main() {
  9. wg := sync.WaitGroup{ }
  10. for i := 0; i < 20; i++ {
  11. wg.Add(1)
  12. go func(n int) {
  13. key := strconv.Itoa(n)
  14. set(key, n)
  15. fmt.Printf("k=:%v,v:=%v\n", key, get(key))
  16. wg.Done()
  17. }(i)
  18. }
  19. wg.Wait()
  20. }

上面的代码开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。

像这种场景下就需要为map加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如StoreLoadLoadOrStoreDeleteRange等操作方法。

  1. var m = sync.Map{ }
  2. func main() {
  3. wg := sync.WaitGroup{ }
  4. for i := 0; i < 20; i++ {
  5. wg.Add(1)
  6. go func(n int) {
  7. key := strconv.Itoa(n)
  8. m.Store(key, n)
  9. value, _ := m.Load(key)
  10. fmt.Printf("k=:%v,v:=%v\n", key, value)
  11. wg.Done()
  12. }(i)
  13. }
  14. wg.Wait()
  15. }






原子操作

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。

| atomic包

在这里插入图片描述
| 示例

我们填写一个示例来比较下互斥锁和原子操作的性能。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. type Counter interface {
  9. Inc()
  10. Load() int64
  11. }
  12. // 普通版
  13. type CommonCounter struct {
  14. counter int64
  15. }
  16. func (c CommonCounter) Inc() {
  17. c.counter++
  18. }
  19. func (c CommonCounter) Load() int64 {
  20. return c.counter
  21. }
  22. // 互斥锁版
  23. type MutexCounter struct {
  24. counter int64
  25. lock sync.Mutex
  26. }
  27. func (m *MutexCounter) Inc() {
  28. m.lock.Lock()
  29. defer m.lock.Unlock()
  30. m.counter++
  31. }
  32. func (m *MutexCounter) Load() int64 {
  33. m.lock.Lock()
  34. defer m.lock.Unlock()
  35. return m.counter
  36. }
  37. // 原子操作版
  38. type AtomicCounter struct {
  39. counter int64
  40. }
  41. func (a *AtomicCounter) Inc() {
  42. atomic.AddInt64(&a.counter, 1)
  43. }
  44. func (a *AtomicCounter) Load() int64 {
  45. return atomic.LoadInt64(&a.counter)
  46. }
  47. func test(c Counter) {
  48. var wg sync.WaitGroup
  49. start := time.Now()
  50. for i := 0; i < 1000; i++ {
  51. wg.Add(1)
  52. go func() {
  53. c.Inc()
  54. wg.Done()
  55. }()
  56. }
  57. wg.Wait()
  58. end := time.Now()
  59. fmt.Println(c.Load(), end.Sub(start))
  60. }
  61. func main() {
  62. c1 := CommonCounter{ } // 非并发安全
  63. test(c1)
  64. c2 := MutexCounter{ } // 使用互斥锁实现并发安全
  65. test(&c2)
  66. c3 := AtomicCounter{ } // 并发安全且比互斥锁效率更高
  67. test(&c3)
  68. }

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

练习题

1.使用goroutine和channel实现一个计算int64随机数各位数和的程序。

  1. 开启一个goroutine循环生成int64类型的随机数,发送到jobChan
  2. 开启24个goroutine从jobChan中取出随机数计算各位数的和,将结果发送到resultChan
  3. 主goroutine从resultChan取出结果并打印到终端输出

2.为了保证业务代码的执行性能将之前写的日志库改写为异步记录日志方式。

【对比python 接着上次的进程隔离】

守护进程,会随着主进程的结束而结束。

主进程创建守护进程

其一:守护进程会在主进程代码执行结束后就终止

其二:守护进程内无法再开启子进程,否则抛出异常:

AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

守护进程的启动

  1. import os
  2. import time
  3. from multiprocessing import Process
  4. class Myprocess(Process):
  5. def __init__(self,person):
  6. super().__init__()
  7. self.person = person
  8. def run(self):
  9. print(os.getpid(),self.name)
  10. print('%s正在和美女聊天' %self.person)
  11. p=Myprocess('winstonfy')
  12. p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
  13. p.start()
  14. time.sleep(10) # 在sleep时查看进程id对应的进程ps -ef|grep id
  15. print('主')

主进程代码执行结束守护进程立即结束

  1. from multiprocessing import Process
  2. def foo():
  3. print(123)
  4. time.sleep(1)
  5. print("end123")
  6. def bar():
  7. print(456)
  8. time.sleep(3)
  9. print("end456")
  10. p1=Process(target=foo)
  11. p2=Process(target=bar)
  12. p1.daemon=True
  13. p1.start()
  14. p2.start()
  15. time.sleep(0.1)
  16. print("main-------")#打印该行则主进程代码结束,则守护进程p1应该被终止.
  17. #可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止.

进程同步(multiprocess.Lock)

锁 —— multiprocess.Lock

当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

多进程抢占输出资源

  1. import os
  2. import time
  3. import random
  4. from multiprocessing import Process
  5. def work(n):
  6. print('%s: %s is running' %(n,os.getpid()))
  7. time.sleep(random.random())
  8. print('%s:%s is done' %(n,os.getpid()))
  9. if __name__ == '__main__':
  10. for i in range(3):
  11. p=Process(target=work,args=(i,))
  12. p.start()

由并发变成了串行,牺牲了运行效率,但避免了竞争;使用锁维护执行顺序

  1. import os
  2. import time
  3. import random
  4. from multiprocessing import Process,Lock
  5. def work(lock,n):
  6. lock.acquire()
  7. print('%s: %s is running' % (n, os.getpid()))
  8. time.sleep(random.random())
  9. print('%s: %s is done' % (n, os.getpid()))
  10. lock.release()
  11. if __name__ == '__main__':
  12. lock=Lock()
  13. for i in range(3):
  14. p=Process(target=work,args=(lock,i))
  15. p.start()

多进程模拟抢票实例

  1. #文件db的内容为:{"count":1}
  2. #注意一定要用双引号,不然json无法识别
  3. from multiprocessing import Process,Lock
  4. import time,json,random
  5. def search():
  6. dic=json.load(open('db'))
  7. print('\033[43m剩余票数%s\033[0m' %dic['count'])
  8. def get():
  9. dic=json.load(open('db'))
  10. time.sleep(0.1) #模拟读数据的网络延迟
  11. if dic['count'] >0:
  12. dic['count']-=1
  13. time.sleep(0.2) #模拟写数据的网络延迟
  14. json.dump(dic,open('db','w'))
  15. print('\033[43m购票成功\033[0m')
  16. def task():
  17. search()
  18. get()
  19. if __name__ == '__main__':
  20. for i in range(100): #模拟并发100个客户端抢票
  21. p=Process(target=task)
  22. p.start()
  23. # 引发问题:数据写入错乱
  24. # 互斥锁保证数据安全
  25. from multiprocessing import Process,Lock
  26. import time,json,random
  27. def search():
  28. dic=json.load(open('db'))
  29. print('\033[43m剩余票数%s\033[0m' %dic['count'])
  30. def get():
  31. dic=json.load(open('db'))
  32. time.sleep(random.random()) # 模拟读数据的网络延迟
  33. if dic['count'] >0:
  34. dic['count']-=1
  35. time.sleep(random.random()) # 模拟写数据的网络延迟
  36. json.dump(dic,open('db','w'))
  37. print('\033[32m购票成功\033[0m')
  38. else:
  39. print('\033[31m购票失败\033[0m')
  40. def task(lock):
  41. search()
  42. lock.acquire() # 将买票这一环节由并发变成了串行,牺牲了运行效率但是保证了数据的安全
  43. get()
  44. lock.release()
  45. if __name__ == '__main__':
  46. lock = Lock()
  47. for i in range(100): # 模拟并发100个客户端抢票
  48. p=Process(target=task,args=(lock,))
  49. p.start()

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,

即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

队列与管道待续。。。

发表评论

表情:
评论列表 (有 0 条评论,122人围观)

还没有评论,来说两句吧...

相关阅读

    相关 前端开发

    一、Malagu 框架 Malagu 是一个基于 TypeScript 的无服务器第一、基于组件、独立于平台的渐进式应用程序框架。 目前,微服务的落地方案有很多,也相当