V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
shaoyie
V2EX  ›  Go 编程语言

[go]golang 的协程池本应该是这样的

  •  
  •   shaoyie · 229 天前 · 1920 次点击
    这是一个创建于 229 天前的主题,其中的信息可能已经有所发展或是发生改变。

    看过了一下 star 比较高的协程池实现,还有字节开源的实现,完全是 java/c++之类的外行实现思路

    协程/线程池,最基本的元件 就是 队列 + 协程/线程,M:N 模型

    这两个组件在 go 里边天生就有啊,为什么再搞一套 task queue 呢?

    控制队列容量:make(chan, cap) 第二参数就可以

    想要控制协程/线程数量,再辅助一个 chan 就可以了,

    代码实现如下,100 行搞定:

    我把它放到 github 上 gopool 喜欢的老铁可以给个 star

    // GoPool is a minimalistic goroutine pool that provides a pure Go implementation
    type GoPool struct {
    	noCopy
    
    	queueLen atomic.Int32
    	doTaskN  atomic.Int32
    	workerN  atomic.Int32
    	options  Options
    
    	workerSem chan struct{}
    	queue     chan func()
    }
    
    // NewGoPool provite fixed number of goroutines, reusable. M:N model
    //
    // M: the number of reusable goroutines,
    // N: the capacity for asynchronous task queue.
    func NewGoPool(opts ...Option) *GoPool {
    	opt := setOptions(opts...)
    	if opt.minWorkers <= 0 {
    		panic("GoPool: min workers <= 0")
    	}
    	if opt.minWorkers > opt.maxWorkers {
    		panic("GoPool: min workers > max workers")
    	}
    	p := &GoPool{
    		options:   opt,
    		workerSem: make(chan struct{}, opt.maxWorkers),
    		queue:     make(chan func(), opt.queueCap),
    	}
    	for i := int32(0); i < p.options.minWorkers; i++ { // pre spawn
    		p.workerSem <- struct{}{}
    		go p.worker(func() {})
    	}
    	go p.shrink()
    	return p
    }
    
    // QueueFree returns (capacity of task-queue - length of task-queue)
    func (p *GoPool) QueueFree() int {
    	return int(p.options.queueCap - p.queueLen.Load())
    }
    
    // Workers returns current the number of workers
    func (p *GoPool) Workers() int {
    	return int(p.workerN.Load())
    }
    
    // Go submits a task to this pool.
    func (p *GoPool) Go(task func()) {
    	if task == nil {
    		panic("GoPool: Go task is nil")
    	}
    	select {
    	case p.queue <- task:
    		p.queueLen.Add(1)
    	case p.workerSem <- struct{}{}:
    		go p.worker(task)
    	}
    }
    
    func (p *GoPool) worker(task func()) {
    	p.workerN.Add(1)
    	defer func() {
    		<-p.workerSem
    		p.workerN.Add(-1)
    		if e := recover(); e != nil {
    			if p.options.panicHandler != nil {
    				p.options.panicHandler(e)
    			}
    		}
    	}()
    
    	for {
    		task()
    		task = <-p.queue
    		if task == nil {
    			break
    		}
    		p.doTaskN.Add(1)
    		p.queueLen.Add(-1)
    	}
    }
    func (p *GoPool) shrink() {
    	ticker := time.NewTicker(p.options.shrinkPeriod)
    	defer ticker.Stop()
    
    	for {
    		select {
    		case <-ticker.C:
    			doTaskN := p.doTaskN.Load()
    			p.doTaskN.Store(0)
    			if doTaskN < p.options.tasksBelowN {
    				closeN := p.workerN.Load() - p.options.minWorkers
    				for closeN > 0 {
    					p.queue <- nil
    					closeN--
    				}
    			}
    		}
    	}
    }
    
    27 条回复    2023-09-18 18:42:57 +08:00
    csh010101
        1
    csh010101  
       229 天前
    好好好
    Nazz
        2
    Nazz  
       229 天前   ❤️ 1
    应该是这样的:

    ```go
    type channel chan struct{}

    func (c channel) add() { c <- struct{}{} }

    func (c channel) done() { <-c }

    func (c channel) Go(f func()) {
    c.add()
    go func() {
    f()
    c.done()
    }()
    }
    ```
    lan171
        3
    lan171  
       229 天前
    都协程了还要池化?
    ylc
        4
    ylc  
       228 天前 via Android
    @lan171 并发太大时,代码没写好的话会造成协程数激增,池化能控制数量,而且性能也能得到提升
    lovelylain
        5
    lovelylain  
       228 天前 via Android
    @Nazz 看上去好像也行,OP 代码这么多是完善了哪些方面?
    shaoyie
        6
    shaoyie  
    OP
       228 天前
    @lovelylain 这不算多吧,这已经算是最精简的了吧,我就是因为看到别人实现的太复杂了,所以写了一个,功能就是可以定时收缩,加了几个计数而已
    shaoyie
        7
    shaoyie  
    OP
       228 天前
    @shaoyie 抱歉回复错了
    Nazz
        8
    Nazz  
       228 天前 via Android
    @lovelylain 最精简的版本,没有之一
    Mohanson
        9
    Mohanson  
       228 天前   ❤️ 1
    XY 问题.

    都协程了就没必要池化, 协程模型需要的是速率控制. 用令牌桶算法 10 行代码就能解决问题. 谈起协程就自动类比为进程, 然后想当然认为应该有一个"协程池", "完全是 java/c++之类的外行实现思路"
    dyllen
        10
    dyllen  
       228 天前
    之前看了有的框架有协程池化的组件,看里面的测试结果,除了内存占用优势,性能上没任何优势。
    keakon
        11
    keakon  
       228 天前
    @shaoyie @Nazz 当队列满的时候,Go() 会阻塞调用线程,而不是加到任务队列里立刻返回,后面空闲了再由 worker 线程去执行。
    troywinter
        12
    troywinter  
       228 天前
    一个 semaphore 就能解决的问题,不需要写这么多代码
    shaoyie
        13
    shaoyie  
    OP
       228 天前
    @dyllen 是的,不会有肉眼可见的性能提升,至于协程复用啥的只是减少一些内存分配而已,主要还是控制数量
    shaoyie
        14
    shaoyie  
    OP
       228 天前
    @troywinter 有 sem 不也得有个 job 队列吗
    shaoyie
        15
    shaoyie  
    OP
       228 天前
    @Mohanson 只是一个小品,总有它存在的应用场景。
    kneo
        16
    kneo  
       228 天前 via Android
    国内大厂自己轮的东西基本都是性能驱动。如果不是为了解决他们自己生产环境里遇见的性能问题,应该不会搞这么个东西出来。
    ZSeptember
        17
    ZSeptember  
       228 天前
    学过 Go 的都能写出这个代码,,但是并不一定能写出那些复杂的 pool 。
    你这个本质上是一个并发控制而已,最基本的优雅关闭也没处理。
    shaoyie
        18
    shaoyie  
    OP
       228 天前
    @kneo 是的,自己造的轮子肯定是为了解决自己生产环境的问题,不用考虑通用性
    shaoyie
        19
    shaoyie  
    OP
       228 天前
    @ZSeptember 也不一定,有些人的实现思路就够 gopher ,自己实现一套条件变量 + queue ,我是觉得没必要,go 天生就带这些东西。另外,你说的优雅关闭 是指关闭 pool 吗?我是觉得没必要,本来 go pool 就不是很必须,再加上一个动态的 pool 就更没必要了
    bv
        20
    bv  
       228 天前
    @Nazz 代码就应该简单直接
    TanKuku
        21
    TanKuku  
       228 天前 via Android
    不懂就问,也没改 runtime 复用什么东西,就单纯控制数量也叫池了吗?
    Pythoner666666
        22
    Pythoner666666  
       228 天前
    所有的池化技术,无论是 mysql redis 还是 http 的池化技术核心都是复用连接再然后是控制连接的数量不至于把 server 的连接资源都耗尽,私以为你这顶多算是控制并发。但是如果只需要控制并发你这未免也太繁琐了
    shaoyie
        23
    shaoyie  
    OP
       228 天前
    @Pythoner666666 是的用个计数器也可以达到效果,但这样不就是更通用一点嘛,
    huija
        24
    huija  
       228 天前
    官方池化实现不给出来了么,sync.Pool ,如果用不到的,那就说明压根不需要池化(比如协程)
    dyllen
        25
    dyllen  
       227 天前
    @TanKuku 他这个代码控制了协程数量,也复用了协程,shrink 这个函数就是发消息结束超时生存的协程的,worker 里面没任务会阻塞等待任务来,直到 shrink 发了结束信号退出。
    lysS
        26
    lysS  
       226 天前
    为什么要控制协程数量?即使要限制也是限制请求并发
    index90
        27
    index90  
       223 天前
    协程池与最大并发数控制傻傻分不清?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2686 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 37ms · UTC 15:30 · PVG 23:30 · LAX 08:30 · JFK 11:30
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.