如何用GO处理高峰期每分钟100万条数据请求

迈不过友情╰ 2024-04-18 14:19 103阅读 0赞

最近看了一篇文章,用go处理每分钟达百万条的数据请求

原文地址:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

翻译地址:https://www.jianshu.com/p/21de03ac682c

这里作者为处理高峰期高并发的数据请求,用了3个版本的处理方式,下面是自己的一些理解:

第一种方式很简单,就是用go的协程处理请求,来一条请求开一个协程处理,由于每个请求是一个数据上传

任务,有一定的耗时和资源消耗,当高峰期请求突然增多达到每分钟百万条的时候,不可避免的造成了携程爆炸,系统崩溃。

第二种方式用channel做了一个任务队列,来一条请求加到任务队列里,简单的生产者消费者模式,但这种方式治标不治本,高峰期的时候,任务队列长度一样会爆炸,造成内存不够用,所以也不太合适。

第三种方式就有点意思了,这里贴一下完整的代码,帮助理解。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "runtime"
  6. )
  7. var (
  8. //最大任务队列数
  9. MaxWorker = 10
  10. )
  11. //有效载荷
  12. type Payload struct {
  13. Num int
  14. Test string
  15. }
  16. //待执行的工作
  17. type Job struct {
  18. Payload Payload
  19. }
  20. //任务队列channal
  21. var JobQueue chan Job
  22. //执行任务的工作者单元
  23. type Worker struct {
  24. WorkerPool chan chan Job //工作者池--每个元素是一个工作者的私有任务channal
  25. JobChannel chan Job //每个工作者单元包含一个任务管道 用于获取任务
  26. quit chan bool //退出信号
  27. no int //编号
  28. }
  29. // 停止信号
  30. func (w Worker) Stop() {
  31. go func() {
  32. w.quit <- true
  33. }()
  34. }
  35. //调度中心
  36. type Dispatcher struct {
  37. //工作者池(任务队列池)
  38. WorkerPool chan chan Job
  39. //工作者单元数量
  40. MaxWorkers int
  41. }
  42. //创建调度中心
  43. func NewDispatcher(maxWorkers int) *Dispatcher {
  44. //创建工作者池,存放待处理的任务队列,maxWorkers为最大任务队列数
  45. pool := make(chan chan Job, maxWorkers)
  46. return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
  47. }
  48. //创建一个新工作者单元
  49. func NewWorker(workerPool chan chan Job, no int) Worker {
  50. fmt.Println("创建一个新工作者单元")
  51. return Worker{
  52. WorkerPool: workerPool,
  53. JobChannel: make(chan Job),
  54. quit: make(chan bool),
  55. no: no,
  56. }
  57. }
  58. //循环 监听任务和结束信号
  59. func (w Worker) Start() {
  60. //启动协程,监听任务
  61. go func() {
  62. for {
  63. // register the current worker into the worker queue.
  64. //工作者放回工作者池
  65. w.WorkerPool <- w.JobChannel
  66. //fmt.Println("w.WorkerPool <- w.JobChannel", w)
  67. select {
  68. case job := <-w.JobChannel:
  69. //fmt.Println("job := <-w.JobChannel")
  70. // 收到任务,执行打印任务
  71. fmt.Println(job.Payload.Test)
  72. //执行任务需要1秒时间
  73. time.Sleep(500 * time.Microsecond)
  74. case <-w.quit:
  75. // 收到退出信号,停止监听,结束该协程
  76. return
  77. }
  78. }
  79. }()
  80. }
  81. //调度,任务分发
  82. func (d *Dispatcher) dispatch() {
  83. for {
  84. select {
  85. //从任务队列中获取任务
  86. case job := <-JobQueue:
  87. //fmt.Println("job := <-JobQueue:")
  88. go func(job Job) {
  89. //等待空闲worker (任务多的时候会阻塞这里)
  90. //从(10个)工作者池中获取一个任务队列channel,
  91. jobChannel := <-d.WorkerPool
  92. //fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
  93. // 将任务放到上述woker的私有任务channal中,jobChannel是一个无缓冲信道,每次只能放一个任务
  94. jobChannel <- job
  95. //fmt.Println("jobChannel <- job")
  96. }(job)
  97. }
  98. }
  99. }
  100. //工作者池的初始化,注意Run为Dispatcher结构体指针的方法,所以此方法内对Dispathcer的修改在方法外也可见
  101. func (d *Dispatcher) Run() {
  102. // starting n number of workers
  103. //创建10个工作者单元
  104. for i := 1; i < d.MaxWorkers+1; i++ {
  105. worker := NewWorker(d.WorkerPool, i)
  106. worker.Start()
  107. }
  108. go d.dispatch()
  109. }
  110. //新建任务并放入任务队列
  111. func addQueue() {
  112. for i := 0; i < 1000000; i++ {
  113. time.Sleep(10*time.Microsecond)
  114. fmt.Println("当前请求数:",i)
  115. // 新建一个任务
  116. payLoad := Payload{Num: 1, Test:"this is Test string"}
  117. work := Job{Payload: payLoad}
  118. // 任务放入任务队列channal
  119. fmt.Println("新任务入队列!")
  120. JobQueue <- work
  121. //fmt.Println("队列总长度:",cap(JobQueue))
  122. //fmt.Println("队列未消费任务数量:",len(JobQueue))
  123. //fmt.Println("JobQueue <- work")
  124. fmt.Println("当前协程数:", runtime.NumGoroutine())
  125. //time.Sleep(1 * time.Second)
  126. }
  127. }
  128. func main() {
  129. //建立任务队列,每个任务队列中可以放10个任务
  130. JobQueue = make(chan Job, 10)
  131. fmt.Println("成功建立任务队列!")
  132. //新建任务分发器
  133. dispatcher := NewDispatcher(MaxWorker)
  134. fmt.Println("成功建立任务分发器!")
  135. dispatcher.Run()
  136. //time.Sleep(1 * time.Second)
  137. go addQueue()
  138. time.Sleep(1000 * time.Second)
  139. }

这种方式简单来说就是用一个两级channel作为一个任务队列池,队列池中存放多个任务队列,来一个任务后加入其中的一个任务队列,每个任务队列开一个协程处理数据上传任务,基本过程如下图所示:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70

  1. 这种方式很好的解决了每个任务处理都要开协称协程造成协程太多的问题,因为总共只有10个协程在处理work,并且工作池中只有10个任务队列,每个任务队列只存放一个任务,所以也不存在队列长度爆炸的问题,但是这种方式也有问题,就是当工作池中没有任务队列空闲的时候,新来的任务只能开一个协程等待空闲的任务队列,如果访问高峰期过长或者任务处理很耗时,会造成等待任务队列的协程急剧增长,最后也会造成系统内存崩溃。这种方式的好处在于等待任务队列的协程都是轻量级协程,每个协程占用的内存资源很少,所以如果只是处理高峰期每分钟百万条的数据请求,是可以的,相当于高峰期把每个任务缓存在轻量级协程里了,过了高峰期,轻量级协程就会减少,不会造成协程爆炸导致内存崩溃。但是如果每分钟百万条数据请求是常态,这种方式肯定也会造成内存崩溃。
  2. 解决的方式有两种:
  3. 1.数据库级别的缓存作为任务队列,硬盘比内存更大更便宜,但是这种方式的延迟会很大
  4. 2.就是加机器了做负载均衡了,没有什么高并发是加一台机器解决不了的,如果有,那就多加几台。
  5. 另外补充一点,第一种方式和第三种方式的好坏取决于每个任务处理时间和工作池中任务队列的数量,以处理每个任务需要5秒为例子,假如每个任务都需要开一个协程处理,那当处理完第一个任务之后,系统的内存占用会稳定下来,因为新开的协程和结束的协程速度一样了,就像水池,进水速度和出水速度一样快,当然就不存在内存溢出问题。而第三种方式因为工作池只有10个任务队列处理任务,造成等待阻塞的协程过多,所以最终也会导致系统内存崩溃。但是如果此时我们把工作池中的任务队列数增加到1000个,那么任务处理速度明显增加,就不会遇到系统崩溃。再以每个任务处理需要50秒为例子,第一种方式因为不断的开协程,没等第一个任务处理完成,内存就崩溃了。但是第三种方式,还是将任务队列加到1000个,即使处理每个任务需要50秒,内存也能扛得住,因为处理任务的协程同样也增加到1000个了,所以可以更加均衡的分配内存。
  6. 所以第三种方式的设计更加合理,可扩展性更强。

发表评论

表情:
评论列表 (有 0 条评论,103人围观)

还没有评论,来说两句吧...

相关阅读