V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
sunshinev
V2EX  ›  程序员

Go 协程池解析~通俗易懂

  •  
  •   sunshinev ·
    sunshinev · 2020-11-19 19:16:13 +08:00 · 1802 次点击
    这是一个创建于 1490 天前的主题,其中的信息可能已经有所发展或是发生改变。

    协程池主要是为了减少 go 协程频繁创建、销毁带来的性能损耗,虽然可以忽略不计,但是网上说特殊情况还是有用的。

    那这个协程池通俗易懂来讲,比如老板给员工分配任务:

    老板领了一堆任务,得找工人干活呀, 那领导就拿出一个任务,给一个空闲的员工 A,再把下一个任务,给另外一个空闲的员工 B 。

    这时候 A 或者 B,指不定谁先忙完了

    如果有人忙完了,领导就把下一个任务,给先忙完的人。A/B 就是协程池里面的两个协程

    下面这段代码,完成了如下功能

    1. 协程池数量上限控制
    2. 协程空闲清理,释放内存
    3. 协程复用
    package gopool
    
    import (
    	"context"
    	"log"
    	"sync"
    	"time"
    )
    
    type Task func()
    
    // boss 老板
    type GoPool struct {
    	MaxWorkerIdleTime time.Duration // worker 最大空闲时间
    	MaxWorkerNum      int32         // 协程最大数量
    	TaskEntryChan     chan *Task    // 任务入列
    	Workers           []*worker     // 已创建 worker
    	FreeWorkerChan    chan *worker  // 空闲 worker
    	Lock              sync.Mutex
    }
    
    const (
    	WorkerStatusStop = 1
    	WorkerStatusLive = 0
    )
    
    // 干活的人
    type worker struct {
    	Pool         *GoPool
    	StartTime    time.Time  // 开始时间
    	TaskChan     chan *Task // 执行队列
    	LastWorkTime time.Time  // 最后执行时间
    	Ctx          context.Context
    	Cancel       context.CancelFunc
    	Status       int32 // 被过期删掉的标记
    }
    
    var defaultPool = func() *GoPool {
    	return NewPool()
    }()
    
    // 初始化
    func NewPool() *GoPool {
    	g := &GoPool{
    		MaxWorkerIdleTime: 10 * time.Second,
    		MaxWorkerNum:      20,
    		TaskEntryChan:     make(chan *Task, 2000),
    		FreeWorkerChan:    make(chan *worker, 2000),
    	}
    
    	// 分发任务
    	go g.dispatchTask()
    
    	//清理空闲 worker
    	go g.fireWorker()
    
    	return g
    }
    
    // 定期清理空闲 worker
    func (g *GoPool) fireWorker() {
    	for {
    		select {
    		// 10 秒执行一次
    		case <-time.After(10 * time.Second):
    			for k, w := range g.Workers {
    				if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {
    					log.Printf("overtime %v %p", k, w)
    					// 终止协程
    					w.Cancel()
    					// 清理 Free
    					w.Status = WorkerStatusStop
    				}
    			}
    
    			g.Lock.Lock()
    			g.Workers = g.cleanWorker(g.Workers)
    			g.Lock.Unlock()
    		}
    	}
    }
    
    // 递归清理无用 worker
    func (g *GoPool) cleanWorker(workers []*worker) []*worker {
    	for k, w := range workers {
    		if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime {
    			workers = append(workers[:k], workers[k+1:]...) // 删除中间 1 个元素
    			return g.cleanWorker(workers)
    		}
    	}
    
    	return workers
    }
    
    // 分发任务
    func (g *GoPool) dispatchTask() {
    
    	for {
    		select {
    		case t := <-g.TaskEntryChan:
    			log.Printf("dispatch task %p", t)
    			// 获取 worker
    			w := g.fetchWorker()
    			// 将任务扔给 worker
    			w.accept(t)
    		}
    	}
    }
    
    // 获取可用 worker
    func (g *GoPool) fetchWorker() *worker {
    	for {
    		select {
    		// 获取空闲 worker
    		case w := <-g.FreeWorkerChan:
    			if w.Status == WorkerStatusLive {
    				return w
    			}
    		default:
    			// 创建新的 worker
    			if int32(len(g.Workers)) < g.MaxWorkerNum {
    				w := &worker{
    					Pool:         g,
    					StartTime:    time.Now(),
    					LastWorkTime: time.Now(),
    					TaskChan:     make(chan *Task, 1),
    					Ctx:          context.Background(),
    					Status:       WorkerStatusLive,
    				}
    				ctx, cancel := context.WithCancel(w.Ctx)
    
    				w.Cancel = cancel
    				// 接到任务自己去执行吧
    				go w.execute(ctx)
    
    				g.Lock.Lock()
    				g.Workers = append(g.Workers, w)
    				g.Lock.Unlock()
    
    				g.FreeWorkerChan <- w
    
    				log.Printf("worker create %p", w)
    			}
    		}
    	}
    }
    
    // 添加任务
    func (g *GoPool) addTask(t Task) {
    	// 将任务放到入口任务队列
    	g.TaskEntryChan <- &t
    }
    
    // 接受任务
    func (w *worker) accept(t *Task) {
    	// 每个 worker 自己的工作队列
    	w.TaskChan <- t
    }
    
    // 执行任务
    func (w *worker) execute(ctx context.Context) {
    	for {
    		select {
    		case t := <-w.TaskChan:
    			// 执行
    			(*t)()
    			// 记录工作状态
    			w.LastWorkTime = time.Now()
    			w.Pool.FreeWorkerChan <- w
    		case <-ctx.Done():
    			log.Printf("worker done %p", w)
    			return
    		}
    	}
    }
    
    // 执行
    func SafeGo(t Task) {
    	defaultPool.addTask(t)
    }
    
    
    4 条回复    2020-11-20 10:01:57 +08:00
    pabupa
        1
    pabupa  
       2020-11-19 19:53:01 +08:00
    越俎代庖
    fuis
        2
    fuis  
       2020-11-19 21:33:32 +08:00   ❤️ 1
    脱裤子放屁
    Weixiao0725
        3
    Weixiao0725  
       2020-11-19 21:39:17 +08:00
    可能协程比线程轻量,不需要池子缓存?
    koujyungenn
        4
    koujyungenn  
       2020-11-20 10:01:57 +08:00
    不懂就问,协程池有什么应用场景?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5319 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 08:35 · PVG 16:35 · LAX 00:35 · JFK 03:35
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.