V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
bthulu
V2EX  ›  程序员

一个线程更新数据, 多个线程读数据, 这种怎么保证线程安全?

  •  
  •   bthulu · 2024-02-27 18:27:45 +08:00 · 4409 次点击
    这是一个创建于 372 天前的主题,其中的信息可能已经有所发展或是发生改变。

    .Net 相关
    线程 0 调用硬件异步 API, 拿到数据后, 从 devices 根据 id 取到 Device 实例, 更新硬件最新数据到这个实例上.
    同时有多个监控线程每隔 100 毫秒读取一次所有设备状态, 并根据设备状态执行一次或多次耗时较长的异步操作, 并在异步操作执行完成后, 对硬件数据进行部分更新.
    这个要怎么做才能确保线程安全?

        // 设备集中存储处
        ConcurrentDictionary<int, Device> devices = new();
    
        // 设备类
        public class Device
        {
            public int Id { get; init; }
            public bool Enable { get; set; }
            public string Group { get; init; } = "";
            public int[] Locations { get; init; } = Array.Empty<int>();
            public int Margin { get; set; }
            public int RsCount { get; init; }
            public bool EnableSplit { get; init; }
            public int DynamicMerge { get; set; }
            public int Width { get; set; }
            public int Length { get; set; }
            public int LeftLength { get; set; }
            public int LoadEdge { get; set; }
            public int Dest { get; set; }
        }
    
        // 数据更新线程相关
        public Thread0Executor()
        {
        	public async Task Execute()
            {
                var data = await GetDataFromHardwareApi();
                Update(data, devices);
            }
        }
    
        // 数据监控处理线程相关
        public MonitorThreadExecutor()
        {
        	public async Task Execute()
            {
                Resolve(devices);
                await Operate0();
                DoSomething();
                await Operate1();
                DoSomething();
            }
            
            public async Task Operate0()
            {
                try
                {
                	await CallApi();
                	Update(devices);
                }
                catch()
                {
                	UpdateIfError(devices);
                }
            }
        }
    

    异步方法中根本没办法使用锁, 顶多用用信号量 Semaphore 来代替锁.

    这里也不能对整个 Execute 方法用锁. 因为监控线程中的异步操作耗时是不一定的, 可能因为网络问题花个几分钟都有可能.

    貌似也没法仅对非异步代码进行加锁, 因为同步异步代码是混杂在一块的, 没法单独对非异步代码进行加锁.

    也考虑过弄个类似 ANDROID 里的 UI 线程和子线程的东西, 数据读取和更新都放在 UI 线程里, 异步操作放在子线程里. 但是搞了半天没搞出来.

    最后的最后, 实在没办法了, 我在想要不把 Device 的所有属性都加一个 volatile 关键字. 我这里更新数据的时候基本不会看原来数据是多少, 不会出现count++这种情况, 貌似 volatile 是可行的. 但是实际这个 Device 有几十个属性, 并且有一两千个 Device, 如果每个属性都加一个 volatile 关键字, 那就是 2000*50=100 万个属性带 volatile 了. 这会不会极大地影响程序运行性能?

    47 条回复    2024-03-08 14:44:46 +08:00
    svnware
        1
    svnware  
       2024-02-27 18:39:51 +08:00
    单写多读不就已经是线程安全的了么。。。
    wamson
        2
    wamson  
       2024-02-27 18:50:10 +08:00 via iPhone
    看标题,寻思,这不就是个读写锁么😳
    laminux29
        3
    laminux29  
       2024-02-27 18:50:53 +08:00
    你这不是一线程更新,多线程读,而是多线程读写。

    这种问题,没把握的话,直接丢给 MSSQL ,如果对数据一致性要求严谨,用序列化级别的事务去操作数据。
    如果要求不严谨,直接用 EF 的乐观锁或最终一致性。
    wayne1007
        4
    wayne1007  
       2024-02-27 18:51:15 +08:00
    double buffer ,写线程 先 load 数据,然后和更新 buffer 的 idx 0->1 或者 1->0
    wayne1007
        5
    wayne1007  
       2024-02-27 18:52:34 +08:00
    @wayne1007 读线程,直接按当前的 idx 读数据,就 idx 切换的瞬间,可能不一致,用信号量也不一定能完全保证,看你这个场景够不够用,不用锁的话
    bthulu
        6
    bthulu  
    OP
       2024-02-27 18:54:03 +08:00
    @wamson 读写锁不行的, 异步调用有可能耗时特别长. 总不能某个设备接口耗时过长时, 其他设备都不能用了吧?
    namonai
        7
    namonai  
       2024-02-27 18:54:37 +08:00
    @svnware 不一样的。比如一段数据,写入一半的时候被读取,读到的就是 broken 的数据。哪怕是对单个字节进行读写操作,也可能存在问题,所以至少要使用原子操作进行保护。
    codcrafts
        8
    codcrafts  
       2024-02-27 18:54:38 +08:00
    我没太懂,你这种情况下会有线程安全问题吗?我感觉不会
    bthulu
        9
    bthulu  
    OP
       2024-02-27 18:55:33 +08:00
    @wayne1007 如果用锁, 只要能保证某个线程调用异步操作耗时特别长时, 其他线程可以干活而不是在那干等着就行.
    bthulu
        10
    bthulu  
    OP
       2024-02-27 18:56:43 +08:00
    @laminux29 丢 SQL 里不到万不得已不考虑, 尽量在内存这一层面解决, 实在么办法了再考虑丢 SQL 里去.
    guo4224
        11
    guo4224  
       2024-02-27 18:57:25 +08:00
    临界区……
    namonai
        12
    namonai  
       2024-02-27 19:04:30 +08:00   ❤️ 1
    @bthulu 你可以试试 trible buffer ,编号 0 、1 、2 ,读线程实现一个 getIndex(),初始的 valid index 是 0 ,需要对数据更改的时候,index + 1 ,往 1 上写,写完了以后 valide index 也更新到 1 ,这个时候 0 和 1 的数据都是有效的,过了一小段时间,0 就没人访问了。在这段时间里如果又有需要写入的数据,那就往 2 上写。这样子可以始终保证读到的数据是完整的。可以把写入操作放在一个单独的线程里进行,其他线程如果有修改数据的需要,就通过队列传递数据过去。
    billccn
        13
    billccn  
       2024-02-27 19:12:20 +08:00
    `devices`这个字典一定程度上就是一个手搓的数据库,你这个里面要考虑的情况很多,比如:

    1. 字典需要动态增删吗?不需要的话这个`Concurrent`是徒增开销
    2. 字典里面的值(就是每个 Device 实例)会被多个线程同时引用吗?字典的`Concurrent`是不会管里面的值是不是 thread-safe
    3. Device 实例需要 referential equivalence 吗?不需要的话建议把这个类变成只读的,每次更新的时候直接替换整个实例最安全
    4. Device 与 Device 之间有关系吗?有的话你可能需要考虑如何 atomically 更新这个字典
    bthulu
        14
    bthulu  
    OP
       2024-02-27 19:14:36 +08:00
    @billccn 字典可以保证 Device 实例引用线程安全. 这里主要的问题就是这个 Device 实例上的茫茫多的属性怎么保证线程安全
    geelaw
        15
    geelaw  
       2024-02-27 19:48:17 +08:00 via iPhone
    >异步方法中根本没办法使用锁, 顶多用用信号量 Semaphore 来代替锁.
    >貌似也没法仅对非异步代码进行加锁, 因为同步异步代码是混杂在一块的, 没法单独对非异步代码进行加锁.

    规则是 lock 里面不可以有 await (可以实现,但是几乎总是错误的,因此语言层面拒绝这样做),在 async 方法中 lock 是完全 OK 的。

    ConcurrentDictionary 已经确保每次访问它的成员都是原子的,然而这不代表对它的访问逻辑就已经线程安全,比如一段代码里连续访问它的成员两次,那么在中途其他线程可能已经修改过了这个字典。说这点是预防针,楼主在 #14 提到这是为了确保 Device 存在 devices 里面的引用安全。

    要保证每个 Device 实例线程安全,最简单的思路是细粒度,比如操作每个 device 的时候 lock 之。如果操作过程需要异步,那么我想象中楼主说有多个线程查看 devices 并做一些事,意思是如果 A 线程处理了 device1 则 B 线程应该跳过并处理 device2 ,这种情况下因为 device 被占用时无需等待,所以可以用 interlocked operation 实现:

    1. 在 device 上加上一个 int 字段 InUse ,表示目前是否在处理它,初始化为 0 。
    2. 要访问一个 device ,先用 Interlocked.Exchange 查看 InUse 并设置为 1 ,如果 InUse 之前也是 1 ,则跳过。
    3. 否则 InUse 之前是 0 并且被原子设置为 1 ,此时当前方法认为自己接管该 device 并开始异步硬件 API 操作,在 await 结束、处理完 device 后,重新 Exchange 把 InUse 还原为 0 。
    dogfeet
        16
    dogfeet  
       2024-02-27 20:18:47 +08:00
    如果更新的时候不看原来的数据,且 [多个监控线程每隔 100 毫秒读取一次所有设备状态, 并根据设备状态执行一次或多次耗时较长的异步操作] 这个过程中数据变化了也没关系的话,可以考虑直接将 Device 变为不可变(所有字段都 readonly)。

    C# 不是特别熟了,devices 本身读写是线程安全的,里面的 device 只要每次更新的时候是替换一个新的不可变对象,这在 java 中是线程安全的。

    几十个字段的拷贝,应该也还好。
    me1onsoda
        17
    me1onsoda  
       2024-02-27 21:19:50 +08:00
    @svnware 这是什么道理,难道一个写者一个读者就不要做互斥访问临界区了?
    zzzyk
        18
    zzzyk  
       2024-02-27 22:53:00 +08:00
    无锁队列看行不行。
    CLMan
        19
    CLMan  
       2024-02-27 23:30:10 +08:00   ❤️ 1
    这个问题的核心是你业务逻辑的“线程安全”是如何定义的(只有你自己知道),至于是用锁、读写锁、Semaphore 、无锁、volatile 等,这些纯粹是实现细节,取决于你对并发相关基础知识(操作系统领域)以及特定语言(这里是.NET )相关库和语法的熟悉程度。

    由于不了解你的业务逻辑实现细节,我只能提问:

    - 线程 0 是只写吗,是否依赖 Device 当前的状态?
    - 监控线程统计所有设备状态时,以及执行异步操作时,是否允许线程 0 进行更新?
    - 监控线程的异步任务与线程 0 是否存在写入相同的内存区域的情况?
    - 监控线程的异步任务是否可能执行超过 100 毫秒,如果超过,是否允许多个监控线程的异步任务同时执行?如果允许,它们的写是否冲突?

    你至少需要补充以上细节,才能让回答者更好的帮你解决问题。
    iceheart
        20
    iceheart  
       2024-02-28 07:22:25 +08:00 via Android
    多个副本数据策略。

    属性数据放两个以上副本,由一个 volatile 索引指定最新副本。

    写线程更新副本后再更新索引。

    读线程按索引访问副本数据。
    bthulu
        21
    bthulu  
    OP
       2024-02-28 08:18:03 +08:00
    @CLMan 线程 0 只写, 不依赖 Device 当前的状态
    监控线程执行异步操作时,允许线程 0 进行更新
    监控线程的异步任务跟线程 0 写入的就是相同的内存区域
    监控线程的异步任务是轮询执行的, 执行完毕后等 100 毫秒再次执行,且执行时间可能长达几分钟。允许多个监控线程的异步任务同时执行。他们的写存在冲突。
    xuanbg
        22
    xuanbg  
       2024-02-28 08:48:44 +08:00
    这……单写不是已经线程安全了么?看内容貌似又不是,OP 还是直接说需求吧,这问题都说不清楚,实在让人挠头。
    zzl22100048
        23
    zzl22100048  
       2024-02-28 08:58:13 +08:00
    layxy
        24
    layxy  
       2024-02-28 09:16:09 +08:00
    又不是多写,单写多读没啥线程安全问题吧
    1008610001
        25
    1008610001  
       2024-02-28 09:26:55 +08:00
    看描述。。。只有一个线程负责写数据 不存在线程安全的问题啊
    lakehylia
        26
    lakehylia  
       2024-02-28 09:29:46 +08:00
    简单点,直接用事务线程不行么?其他多个线程都是提交事务给事务线程负责读写,然后事务线程回调结果。
    4kingRAS
        27
    4kingRAS  
       2024-02-28 09:56:15 +08:00
    读写操作是原子的吗?原子的,一个线程写根本没多线程问题
    如果不是原子的,先尝试做到原子,做不到就读写时加锁
    wu00
        28
    wu00  
       2024-02-28 10:19:56 +08:00
    是不是想太多了?
    ConcurrentDictionary 本就是线程安全集合,TryAdd(),TryUpdate()都是原子操作。
    所以就算你 Thread0 、Monitor1 、Monitor2 三个线程并发 ConcurrentDictionary 进行操作,也不会出现线程安全问题;会出现的是你业务上的“线程安全”问题:到底谁的优先级更高?
    cloud107202
        29
    cloud107202  
       2024-02-28 11:01:46 +08:00
    这里可以考虑做个线程读写分离。没接触过 .Net 我会用 Java 的 type 与 API 描述,自行对应一下:

    首先把成员 devices 与相关的操作都封装到一个类型里面,对外暴露一个 public 的阻塞队列成员变量,Java 的话我会用有阻塞语义的 ArrayBlockingQueue. 这个类型在构建的时候(onCreation),启动一个单线程去 poll 这个 Queue. devices 的更新逻辑都由这个单线程完成

    外面的异步操作获取到设备信息后,以 ImmutableEvent 的形式把必要的信息封装描述好,放入队列. 形如 ArrayBlockingQueue<DeviceUpdatedEvent> 这样子,里面的单线程 poll 到事件直接更新 Dictionary 即可。

    最后剩下这个“多个监控线程每隔 100 毫秒读取一次所有设备状态” ,这里简单起见可以将 devices 也设置成 public ,直接在外面访问 devices 成员(重点是:一定要约定好,在 poll 的线程之外的逻辑,全部只能 read 这个 ConcurrentDictionary )。因为 Dictionary 本身使用了线程安全的 ConcurrentDictionary ,对它的 CRUD 是线程安全的,只需要防止外面监控程序获取到某个尚未更新完成的某个 Device 实例(有点像 DB 的脏读),这里给 Device 每个属性设置 volatile 肯定是不合适的:可以考虑前面提到的,在负责 poll 的单线程,获取到更新事件后,不要就地改变 device 对象本身的属性值,而是以 deepCopy 的方式创建个全新的 Device 实例。然后用 ConcurrentDictionary.put(key, value) 的 API 直接更新整个 Device 对象,规避外部监控线程在 scan 的时候,获取到属性更新不完整的 stale state
    jones2000
        30
    jones2000  
       2024-02-28 11:13:29 +08:00
    奇偶读写,2 个内存块( 0 号,1 号),0 号写的时候,1 号读。1 号写的时候,0 号读。
    dode
        31
    dode  
       2024-02-28 14:06:07 +08:00
    调整锁的粒度
    liuky
        32
    liuky  
       2024-02-28 14:30:22 +08:00
    使用阻塞队列 BlockingCollection 试试,
    qping
        33
    qping  
       2024-02-28 14:31:33 +08:00
    我感觉 27 楼说的做到写原子操作就可以了

    Device 应该是一个 immutable 得对象,不可变
    想要更新只能 clone ,然后 update 到字典中
    sparklee
        34
    sparklee  
       2024-02-28 14:37:27 +08:00
    单个线程更新, 所有需要更新的操作都做成 任务 都放到任务队列
    yansideyu
        35
    yansideyu  
       2024-02-28 14:40:53 +08:00
    楼主的问题是所有线程更新数据的时候,需要更新多个属性,怎么避免没有全部更新完的情况下,其他线程读取了数据。拿到了脏数据?
    i8086
        36
    i8086  
       2024-02-28 14:41:51 +08:00
    楼主意思应该是多线程更新集合里 Device 类型属性值的问题?

    用 volatile 就好了,目前是最方便。
    qping
        37
    qping  
       2024-02-28 14:44:47 +08:00
    又仔细看了下,你是多线程写啊,MonitorThread (多个)和 Thread0 都能更新, 那存在一些问题

    1. MonitorThread 和 Thread0 是否会写入冲突
    如果 MonitorThread 和 Thread0 写入相同得内存,那感觉就是设计有问题
    那我假想他们不会冲突

    2. 多个 MonitorThread 冲突的问题
    多个 MonitorThread 每次都更新全部的 devices ,这个设计也很奇怪

    假设已经做到通过锁或其他手段,保证一个 MonitorThread 更新是原子级别的。
    MonitorThread A 先启动,MonitorThread B 后启动,因为等待时间长 A 的结果却比 B 后写入,这样没有问题吗?

    我觉得,应该可以有多个 MonitorThread 线程,但是每个 Device 只能同一时间被一个 MonitorThread 更新
    实现方法上,可以用队列,每次更新 MonitorThread 从队列中取一个 Device ,如果更新完重新还回
    yicong135
        38
    yicong135  
       2024-02-28 15:24:41 +08:00
    shapper
        39
    shapper  
       2024-02-28 16:23:22 +08:00
    task 本身就是开新线程,减少锁粒度,锁 devices 就可以,把具体 device 分配到 task ,task 只修改自己引用的 device ,不修改 devices ;
    dogfeet
        40
    dogfeet  
       2024-02-28 17:32:04 +08:00
    @bthulu 看起来就是写不依赖读,或者说写需要的读状态可以是旧数据(只需完整,无需最新)。那么单纯的将 Device 变为不可变就行。ConcurrentDictionary 单纯的读写本身是原子的,查了一下,不可变的线程安全 C# 与 Java 是一致的。
    nevermoreluo
        41
    nevermoreluo  
       2024-02-28 17:33:40 +08:00
    除了 Group 都是 int 或者 bool ,Group 不动的话 保证原子性应该就好了吧....
    svnware
        42
    svnware  
       2024-02-28 17:59:58 +08:00
    @namonai 不考虑数据一致性,允许脏读就无所谓了
    xumng123
        43
    xumng123  
       2024-02-28 19:05:58 +08:00 via iPhone
    已经是安全的了
    bthulu
        44
    bthulu  
    OP
       2024-02-29 08:26:35 +08:00
    @qping Thread0 会不依赖原有属性值更新所有 Device 的属性. MonitorThread 会读大部分的 devices, 并更新小部分 devices.
    m2276699
        45
    m2276699  
       2024-02-29 08:49:00 +08:00
    这样的业务应该用事件驱动
    johnnyyeen
        46
    johnnyyeen  
       2024-02-29 17:16:40 +08:00
    1 生产者对多消费者,给每个消费者一个队列。
    通过原子操作(信号或者锁)的方式保护生产者与消费者的竞争条件(我写数据你取走数据)。
    svnware
        47
    svnware  
       362 天前
    @me1onsoda 举个栗子,对硬件熟悉的话,机器字长多少,每次写入刚好是一个机器字长就不会被打断,肯定是安全的。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1533 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 16:59 · PVG 00:59 · LAX 08:59 · JFK 11:59
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.