V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
Gota
V2EX  ›  Go 编程语言

Go 1.21 新加的 log/slog 大家开始用了吗? 写了一个输出到阿里云日志的 writer 扩展, 顺带问个跟 slog 扩展相关的问题.

  •  1
     
  •   Gota ·
    gota33 · 2023-09-28 15:23:40 +08:00 · 3305 次点击
    这是一个创建于 421 天前的主题,其中的信息可能已经有所发展或是发生改变。

    先是分享下我写的扩展

    扩展 writer 的地址: https://github.com/gota33/aliyun-log-writer

    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    

    用的时候把参数中的 os.Stdout 换成这个 writer 就行了.

    然后问两个问题

    一个是 slog 想 Hook 的话就两个接口可以入手, slog.Handlerio.Writer.

    我这里选择了 io.Writer, 主要是因为自己写 slog.Handler 的话, 得把 slog.commonHandler 里的逻辑再实现一遍, 实在有点麻烦, 有没有像 logrus.Hook 那样比较简单的实现方式?


    还有一个问题跟 channel 有关.

    有这样的工作队列, submit() 提交任务, stop() 阻止新任务提交, 并处理完存量任务后返回.

    var (
    	ErrClosed = errors.New("closed")
    	chQuit    = make(chan struct{})
    	chData    = make(chan int, 10)
    )
    
    // func start() { ... }
    
    func submit(n int) error {
    	select {
    	case <-chQuit:
    		return ErrClosed
    	case chData <- n:
    		return nil
    	}
    }
    
    func stop() {
    	close(chQuit)
    	close(chData)
    	
    	for n := range chData {
    		// process data
    		_ = n
    	}
    }
    

    但是 selectswitch 不一样, case 的选择不是有序的, 导致有时候会选到第二个 case 然后 panic.

    所以后来把 stop() 改成这样了

    func stop() {
    	close(chQuit)
    
    	ch := chData
    	chData = make(chan int)
    
    	for len(ch) > 0 {
    		// process data
    		select {
    		case n := <-ch:
    			_ = n
    		default:
    		}
    	}
    }
    

    这种情况大家一般是怎么处理的?

    40 条回复    2023-10-16 13:53:34 +08:00
    hallDrawnel
        1
    hallDrawnel  
       2023-09-28 15:43:25 +08:00
    看你的逻辑是想阻止新任务提交,但是已经提交的任务要继续处理完是吧?

    如果可以可以控制上游,那应该把 close chan 返回回去,让上游自己停止提交。
    Gota
        2
    Gota  
    OP
       2023-09-28 15:48:07 +08:00
    @hallDrawnel 上游是 io.Writer 的 Write(), 用户用 slog 写日志的时候触发, 所以控制不了.
    wentx
        3
    wentx  
       2023-09-28 15:53:09 +08:00
    close(chQuit) 改成 chQuit <- struct{}{}

    试试
    hsfzxjy
        4
    hsfzxjy  
       2023-09-28 15:55:33 +08:00 via Android
    chData 不要 close 呗
    Gota
        5
    Gota  
    OP
       2023-09-28 16:01:31 +08:00
    #3 @wentx 只要 select 是无序的, 都有可能选到第二个. https://stackoverflow.com/questions/68650423/do-select-statements-guarantee-order-of-channel-selection

    #4 @hsfzxjy 结尾那套写法就没 close, 想看看有没有其他的处理方式.
    wentx
        6
    wentx  
       2023-09-28 16:23:21 +08:00
    @Gota 不不不,你 chQuit 是无缓冲的 channel, 你在往里写的时候,会阻塞,等到 submit 函数里面
    case <-chQuit:
    return ErrClosed

    这段执行到,才会走到下一步 close(chData),这个时候 submit 函数已经退出,所以 submit 不会 panic
    wentx
        7
    wentx  
       2023-09-28 16:24:25 +08:00
    @wentx 然后 stop 的最后再去 close(chQuit)
    Gota
        8
    Gota  
    OP
       2023-09-28 16:30:05 +08:00
    @wentx 但 submit() 不一定只有一个线程在调用. 而且如果在 stop 清空存量任务的过程中, 有另一个 submit() 调用, 还是会走到第二个 case 的吧? 可能还有一个问题, 如果没有 submit() 直接调用 stop() 程序就卡住了.
    hsfzxjy
        9
    hsfzxjy  
       2023-09-28 16:30:51 +08:00 via Android
    @wentx 要是此时没有 submit ,那 stop 就塞住了
    要是有多个 submit 在并发,那只有一个是正确的
    你这问题更大
    wentx
        10
    wentx  
       2023-09-28 16:42:29 +08:00
    @Gota 这个是队列设计的问题了,如何保证 producer 关闭后,其他 goroutine 调用 submit 没有副作用。
    wentx
        11
    wentx  
       2023-09-28 16:45:51 +08:00
    在这个场景下面,甚至都不需要使用 channel 来通知,你直接一个全局变量,在 submit 的时候,判断一下是否是 close 就可以了。
    Gota
        12
    Gota  
    OP
       2023-09-28 16:48:57 +08:00
    @wentx 没听太明白, 可以具体描述下吗?
    wentx
        13
    wentx  
       2023-09-28 17:05:02 +08:00
    @Gota
    ```
    var (
    ErrClosed = errors.New("closed")
    chQuit bool
    chData = make(chan int, 10)
    )

    // func start() { ... }

    func submit(n int) error {
    if chQuit {
    return ErrClosed
    }

    chData <- n:

    return nil
    }

    func stop() {
    chQuit = true
    close(chData)

    for n := range chData {
    // process data
    _ = n
    }
    }
    ```
    pkoukk
        14
    pkoukk  
       2023-09-28 17:10:02 +08:00
    stop 里不要 close(chData)啊 , defer close(chData) 不就可以了?
    Gota
        15
    Gota  
    OP
       2023-09-28 17:15:09 +08:00
    #13 @wentx 嗯... 并发环境下直接用 bool 值已经很危险了. 而且即使这样写, 程序卡住和 panic 的问题依然会存在.
    #14 @pkoukk defer 也不行, 哪怕 stop() 已经完全执行完了. 这时候调 submit() 还是有概率选到第二个 case.
    wentx
        16
    wentx  
       2023-09-28 17:22:36 +08:00
    @Gota 会啥会卡住 和 panic 呢?
    并发的话,atomic.LoadXXX 或者 atomic.StoreXXX
    Gota
        17
    Gota  
    OP
       2023-09-28 17:30:58 +08:00
    @wentx

    因为并发环境下函数执行随时会被挂起. 如果 submit 执行完 if 判断被挂起, 去执行 stop, 等恢复执行 submit 的时候就会 panic

    即使正常执行, 如果 submit 执行到 chData <- n 如果因为 buffer 满了开始等待, 此时执行 stop, 会 100% panic.
    pkoukk
        18
    pkoukk  
       2023-09-28 17:44:51 +08:00
    @pkoukk #14 stop 都执行完了,为什么还有线程可以调用 submit 啊?
    通道关闭了,还尝试往通道写入数据也是一种 panic 行为啊
    我们一般的做法是用 context.Done 作为标志,因为 contenxt 是可以逐层传递的
    当 sumbit 这边收到 Done 的时候,生产者同时也应该停止了
    Gota
        19
    Gota  
    OP
       2023-09-28 17:49:42 +08:00
    @pkoukk 因为这是个 Logger, 调用者从各个线程触发写日志的操作. 在主线程调用 stop() 的时候没法确保其他线程都提前停下来不写日志. 如果 writer 返回 ErrClose 的话 slog 是能处理的, 但直接 panic 掉就不行了.
    qing18
        20
    qing18  
       2023-09-28 18:37:11 +08:00
    为啥要 close chData 呢?
    ```
    func stop() {
    select {
    case <-chQuit:
    return
    default:
    close(chQuit)
    }
    }
    ```
    Gota
        21
    Gota  
    OP
       2023-09-28 18:47:22 +08:00
    @qing18 不 close(chData) 就是帖子末尾处的写法.
    这里的 submit 接口需要确保: 如果不返回错误的话, 写入的 data 是一定要被处理的.
    所以如果不 close 也不换成一个无缓冲 channel 的话, 会出现调用者认为数据成功提交了, 但实际上却没处理的情况.
    soap520
        22
    soap520  
       2023-09-28 19:57:25 +08:00
    ```
    func submit(n int) error {
    select {
    case <-chQuit:
    return ErrClosed
    default:
    }

    chData <- n
    return nil

    }

    func stop() {
    close(chQuit)

    for n := range chData {
    // process data
    _ = n
    }

    close(chData)
    }
    ```

    看看这样行不行,
    submit 里面先判断一下 chQuit 是不是已经 close 了。
    stop 处理完再 close chData 。

    一种可能让人看起来有点担心的执行顺序是,1. submit 里, 判断 chQuit 还没关闭。2. stop 里,执行 close(chQuit)。3. submit 里,接着 chData <- n 。不过应该在你的用例中年问题不大。
    Gota
        23
    Gota  
    OP
       2023-09-28 20:20:11 +08:00
    @soap520 你这里把 close(chData) 放到 for 循环之后执行, 那 for 循环就永远结束不了了.
    realpg
        24
    realpg  
       2023-09-28 20:30:57 +08:00
    没有
    1.21 自己瞎鸡儿改 xml excel 功能都用不了,被迫退回 1.20
    Gota
        25
    Gota  
    OP
       2023-09-28 20:40:06 +08:00
    @realpg 升到 1.21.1 试试呢? 一般我都等大版本之后的一个小版本才开始正式用, 最开始那个版本确实容易出一些小问题.
    soap520
        26
    soap520  
       2023-09-28 20:43:37 +08:00
    @Gota 确实,我把 stop 改成这样是不是就可以了。
    ```
    func stop() {
    close(chQuit)

    for {
    select {
    case n := <-chData:
    _ = n
    default:
    close(chData)
    return
    }
    }
    }
    ```
    realpg
        27
    realpg  
       2023-09-28 20:48:47 +08:00
    @Gota #25
    之前说了,下个小版本会合并修复
    还不是 golang 自己修的,excel 那个库大佬给提的 pr
    没有特别频繁检查版本的习惯,过两天再说
    Gota
        28
    Gota  
    OP
       2023-09-28 20:55:40 +08:00
    @soap520 那就剩下 #22 里你自己提到的那个 panic 问题了. 这里的用例是 slog 的 hook, 所以 submit 可能会在任意线程中被调用, 数量和时机都是没办法控制的, 也就是说 submit 里那个过了 if 之后的挂起其实很容易触发.
    soap520
        29
    soap520  
       2023-09-28 21:14:31 +08:00
    @Gota 明白了,那我把 stop 最后 close(chData)去掉是不是就行了。去掉之后看起来和你 1L 的方法就差不多了,只是没有重新给 chData 赋值(我也不清楚 slog hook 的用例里需不需要再给 chData 一个 channel )。
    如果要很“完美”的话,我除了弄一个锁把 submit 里的 read chDone, enqueue data 保护起来之外想不到更好的办法了。
    Gota
        30
    Gota  
    OP
       2023-09-28 21:25:07 +08:00
    @soap520 哈哈, 异步相关的东西确实比较烧脑. 1L 重新赋值一个无缓冲 channel 是为了防止 stop 之后有数据进入 chData 却没人来处理, 随着主线程退出这份数据就丢掉了. 至于加锁, 不到万不得已最好别加, 否则每调一次 Log 整个应用都被锁一下, 就有点夸张了.
    nuk
        31
    nuk  
       2023-09-29 03:58:37 +08:00
    显然需要一个临界区,close channel 的时候要等待其他所有线程离开临界区,不管用 channel 还是 lock 来实现代价都蛮大,单个 bool 或者单个 channel close 都不可能实现,感觉 stop 就直接 drop 掉 log 会比较容易简单。
    Gota
        32
    Gota  
    OP
       2023-09-29 10:20:05 +08:00 via iPhone
    @nuk 直接丢数据肯定不行,正文末尾的写法也不用等其他线程呀。
    nuk
        33
    nuk  
       2023-09-29 12:43:34 +08:00
    @Gota 但是这个也不能保证 close channel 之后没有线程去写啊。。
    Gota
        34
    Gota  
    OP
       2023-09-29 12:53:28 +08:00 via iPhone
    @nuk 没有 close ,是换成了一个无缓冲的 channel ,从而确保能 select 到第一个 case 。
    mapleray
        35
    mapleray  
       2023-10-01 11:24:40 +08:00
    @Gota #30 如果不想加锁,又不想丢日志,只能想办法把关闭操作抛给使用者了。

    目前使用方式 uber-go/atomic.Bool + 缓冲 channel ,退出时等待 channel 清理完才退出。
    paceewang1
        36
    paceewang1  
       2023-10-07 14:42:42 +08:00
    这个场景,可以用乐观锁吧,atomic ,CAS
    Jrue0011
        37
    Jrue0011  
       2023-10-12 17:26:15 +08:00
    读写锁的场景?
    stop 写锁更新状态
    submit 读锁判断状态
    Gota
        38
    Gota  
    OP
       2023-10-12 17:47:54 +08:00
    @paceewang1 #36
    @Jrue0011 #37
    见 #30 的回复,帖子末尾的写法是不需要锁的
    paceewang1
        39
    paceewang1  
       2023-10-16 11:19:01 +08:00
    @Gota CAS 也不是在 submit 里面加锁,我是指在 stop 里面加锁然后转换状态;相当于引入一个状态机而已,submit 只需要加一个状态判断就可以了,看了一下就和#13 的代码大致一样吧,但是 stop 方法先处理 chData 再关闭:
    ```
    func stop() {
    if ok := CAS(chQuit); !ok {
    // return error
    }

    for n := range chData {
    // process data
    _ = n
    }

    close(chData)
    }
    ```
    Gota
        40
    Gota  
    OP
       2023-10-16 13:53:34 +08:00 via iPhone
    @paceewang1 见#23 楼和后续的回复,还是有点问题
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2831 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 07:17 · PVG 15:17 · LAX 23:17 · JFK 02:17
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.