.Net 相关
线程 0 调用硬件异步 API, 拿到数据后, 从 devices 根据 id 取到 Device 实例, 更新硬件最新数据到这个实例上.
同时有多个监控线程每隔 100 毫秒读取一次所有设备状态, 并根据设备状态执行一次或多次耗时较长的异步操作, 并在异步操作执行完成后, 对硬件数据进行部分更新.
这个要怎么做才能确保线程安全?
// 设备集中存储处
ConcurrentDictionary<int, Device> devices = new();
// 设备类
public class Device
{
public int Id { get; init; }
public bool Enable { get; set; }
public string Group { get; init; } = "";
public int[] Locations { get; init; } = Array.Empty<int>();
public int Margin { get; set; }
public int RsCount { get; init; }
public bool EnableSplit { get; init; }
public int DynamicMerge { get; set; }
public int Width { get; set; }
public int Length { get; set; }
public int LeftLength { get; set; }
public int LoadEdge { get; set; }
public int Dest { get; set; }
}
// 数据更新线程相关
public Thread0Executor()
{
public async Task Execute()
{
var data = await GetDataFromHardwareApi();
Update(data, devices);
}
}
// 数据监控处理线程相关
public MonitorThreadExecutor()
{
public async Task Execute()
{
Resolve(devices);
await Operate0();
DoSomething();
await Operate1();
DoSomething();
}
public async Task Operate0()
{
try
{
await CallApi();
Update(devices);
}
catch()
{
UpdateIfError(devices);
}
}
}
异步方法中根本没办法使用锁, 顶多用用信号量 Semaphore 来代替锁.
这里也不能对整个 Execute 方法用锁. 因为监控线程中的异步操作耗时是不一定的, 可能因为网络问题花个几分钟都有可能.
貌似也没法仅对非异步代码进行加锁, 因为同步异步代码是混杂在一块的, 没法单独对非异步代码进行加锁.
也考虑过弄个类似 ANDROID 里的 UI 线程和子线程的东西, 数据读取和更新都放在 UI 线程里, 异步操作放在子线程里. 但是搞了半天没搞出来.
最后的最后, 实在没办法了, 我在想要不把 Device 的所有属性都加一个 volatile 关键字. 我这里更新数据的时候基本不会看原来数据是多少, 不会出现count++
这种情况, 貌似 volatile 是可行的. 但是实际这个 Device 有几十个属性, 并且有一两千个 Device, 如果每个属性都加一个 volatile 关键字, 那就是 2000*50=100 万个属性带 volatile 了. 这会不会极大地影响程序运行性能?
1
svnware 256 天前
单写多读不就已经是线程安全的了么。。。
|
2
wamson 256 天前 via iPhone
看标题,寻思,这不就是个读写锁么😳
|
3
laminux29 256 天前
你这不是一线程更新,多线程读,而是多线程读写。
这种问题,没把握的话,直接丢给 MSSQL ,如果对数据一致性要求严谨,用序列化级别的事务去操作数据。 如果要求不严谨,直接用 EF 的乐观锁或最终一致性。 |
4
wayne1007 256 天前
double buffer ,写线程 先 load 数据,然后和更新 buffer 的 idx 0->1 或者 1->0
|
7
namonai 256 天前
@svnware 不一样的。比如一段数据,写入一半的时候被读取,读到的就是 broken 的数据。哪怕是对单个字节进行读写操作,也可能存在问题,所以至少要使用原子操作进行保护。
|
8
codcrafts 256 天前
我没太懂,你这种情况下会有线程安全问题吗?我感觉不会
|
11
guo4224 256 天前
临界区……
|
12
namonai 256 天前 1
@bthulu 你可以试试 trible buffer ,编号 0 、1 、2 ,读线程实现一个 getIndex(),初始的 valid index 是 0 ,需要对数据更改的时候,index + 1 ,往 1 上写,写完了以后 valide index 也更新到 1 ,这个时候 0 和 1 的数据都是有效的,过了一小段时间,0 就没人访问了。在这段时间里如果又有需要写入的数据,那就往 2 上写。这样子可以始终保证读到的数据是完整的。可以把写入操作放在一个单独的线程里进行,其他线程如果有修改数据的需要,就通过队列传递数据过去。
|
13
billccn 256 天前
`devices`这个字典一定程度上就是一个手搓的数据库,你这个里面要考虑的情况很多,比如:
1. 字典需要动态增删吗?不需要的话这个`Concurrent`是徒增开销 2. 字典里面的值(就是每个 Device 实例)会被多个线程同时引用吗?字典的`Concurrent`是不会管里面的值是不是 thread-safe 3. Device 实例需要 referential equivalence 吗?不需要的话建议把这个类变成只读的,每次更新的时候直接替换整个实例最安全 4. Device 与 Device 之间有关系吗?有的话你可能需要考虑如何 atomically 更新这个字典 |
15
geelaw 256 天前 via iPhone
>异步方法中根本没办法使用锁, 顶多用用信号量 Semaphore 来代替锁.
>貌似也没法仅对非异步代码进行加锁, 因为同步异步代码是混杂在一块的, 没法单独对非异步代码进行加锁. 规则是 lock 里面不可以有 await (可以实现,但是几乎总是错误的,因此语言层面拒绝这样做),在 async 方法中 lock 是完全 OK 的。 ConcurrentDictionary 已经确保每次访问它的成员都是原子的,然而这不代表对它的访问逻辑就已经线程安全,比如一段代码里连续访问它的成员两次,那么在中途其他线程可能已经修改过了这个字典。说这点是预防针,楼主在 #14 提到这是为了确保 Device 存在 devices 里面的引用安全。 要保证每个 Device 实例线程安全,最简单的思路是细粒度,比如操作每个 device 的时候 lock 之。如果操作过程需要异步,那么我想象中楼主说有多个线程查看 devices 并做一些事,意思是如果 A 线程处理了 device1 则 B 线程应该跳过并处理 device2 ,这种情况下因为 device 被占用时无需等待,所以可以用 interlocked operation 实现: 1. 在 device 上加上一个 int 字段 InUse ,表示目前是否在处理它,初始化为 0 。 2. 要访问一个 device ,先用 Interlocked.Exchange 查看 InUse 并设置为 1 ,如果 InUse 之前也是 1 ,则跳过。 3. 否则 InUse 之前是 0 并且被原子设置为 1 ,此时当前方法认为自己接管该 device 并开始异步硬件 API 操作,在 await 结束、处理完 device 后,重新 Exchange 把 InUse 还原为 0 。 |
16
dogfeet 256 天前
如果更新的时候不看原来的数据,且 [多个监控线程每隔 100 毫秒读取一次所有设备状态, 并根据设备状态执行一次或多次耗时较长的异步操作] 这个过程中数据变化了也没关系的话,可以考虑直接将 Device 变为不可变(所有字段都 readonly)。
C# 不是特别熟了,devices 本身读写是线程安全的,里面的 device 只要每次更新的时候是替换一个新的不可变对象,这在 java 中是线程安全的。 几十个字段的拷贝,应该也还好。 |
18
zzzyk 256 天前
无锁队列看行不行。
|
19
CLMan 256 天前 1
这个问题的核心是你业务逻辑的“线程安全”是如何定义的(只有你自己知道),至于是用锁、读写锁、Semaphore 、无锁、volatile 等,这些纯粹是实现细节,取决于你对并发相关基础知识(操作系统领域)以及特定语言(这里是.NET )相关库和语法的熟悉程度。
由于不了解你的业务逻辑实现细节,我只能提问: - 线程 0 是只写吗,是否依赖 Device 当前的状态? - 监控线程统计所有设备状态时,以及执行异步操作时,是否允许线程 0 进行更新? - 监控线程的异步任务与线程 0 是否存在写入相同的内存区域的情况? - 监控线程的异步任务是否可能执行超过 100 毫秒,如果超过,是否允许多个监控线程的异步任务同时执行?如果允许,它们的写是否冲突? 你至少需要补充以上细节,才能让回答者更好的帮你解决问题。 |
20
iceheart 255 天前 via Android
多个副本数据策略。
属性数据放两个以上副本,由一个 volatile 索引指定最新副本。 写线程更新副本后再更新索引。 读线程按索引访问副本数据。 |
21
bthulu OP @CLMan 线程 0 只写, 不依赖 Device 当前的状态
监控线程执行异步操作时,允许线程 0 进行更新 监控线程的异步任务跟线程 0 写入的就是相同的内存区域 监控线程的异步任务是轮询执行的, 执行完毕后等 100 毫秒再次执行,且执行时间可能长达几分钟。允许多个监控线程的异步任务同时执行。他们的写存在冲突。 |
22
xuanbg 255 天前
这……单写不是已经线程安全了么?看内容貌似又不是,OP 还是直接说需求吧,这问题都说不清楚,实在让人挠头。
|
23
zzl22100048 255 天前
|
24
layxy 255 天前
又不是多写,单写多读没啥线程安全问题吧
|
25
1008610001 255 天前
看描述。。。只有一个线程负责写数据 不存在线程安全的问题啊
|
26
lakehylia 255 天前
简单点,直接用事务线程不行么?其他多个线程都是提交事务给事务线程负责读写,然后事务线程回调结果。
|
27
4kingRAS 255 天前
读写操作是原子的吗?原子的,一个线程写根本没多线程问题
如果不是原子的,先尝试做到原子,做不到就读写时加锁 |
28
wu00 255 天前
是不是想太多了?
ConcurrentDictionary 本就是线程安全集合,TryAdd(),TryUpdate()都是原子操作。 所以就算你 Thread0 、Monitor1 、Monitor2 三个线程并发 ConcurrentDictionary 进行操作,也不会出现线程安全问题;会出现的是你业务上的“线程安全”问题:到底谁的优先级更高? |
29
cloud107202 255 天前
这里可以考虑做个线程读写分离。没接触过 .Net 我会用 Java 的 type 与 API 描述,自行对应一下:
首先把成员 devices 与相关的操作都封装到一个类型里面,对外暴露一个 public 的阻塞队列成员变量,Java 的话我会用有阻塞语义的 ArrayBlockingQueue. 这个类型在构建的时候(onCreation),启动一个单线程去 poll 这个 Queue. devices 的更新逻辑都由这个单线程完成 外面的异步操作获取到设备信息后,以 ImmutableEvent 的形式把必要的信息封装描述好,放入队列. 形如 ArrayBlockingQueue<DeviceUpdatedEvent> 这样子,里面的单线程 poll 到事件直接更新 Dictionary 即可。 最后剩下这个“多个监控线程每隔 100 毫秒读取一次所有设备状态” ,这里简单起见可以将 devices 也设置成 public ,直接在外面访问 devices 成员(重点是:一定要约定好,在 poll 的线程之外的逻辑,全部只能 read 这个 ConcurrentDictionary )。因为 Dictionary 本身使用了线程安全的 ConcurrentDictionary ,对它的 CRUD 是线程安全的,只需要防止外面监控程序获取到某个尚未更新完成的某个 Device 实例(有点像 DB 的脏读),这里给 Device 每个属性设置 volatile 肯定是不合适的:可以考虑前面提到的,在负责 poll 的单线程,获取到更新事件后,不要就地改变 device 对象本身的属性值,而是以 deepCopy 的方式创建个全新的 Device 实例。然后用 ConcurrentDictionary.put(key, value) 的 API 直接更新整个 Device 对象,规避外部监控线程在 scan 的时候,获取到属性更新不完整的 stale state |
30
jones2000 255 天前
奇偶读写,2 个内存块( 0 号,1 号),0 号写的时候,1 号读。1 号写的时候,0 号读。
|
31
dode 255 天前
调整锁的粒度
|
32
liuky 255 天前
使用阻塞队列 BlockingCollection 试试,
|
33
qping 255 天前
我感觉 27 楼说的做到写原子操作就可以了
Device 应该是一个 immutable 得对象,不可变 想要更新只能 clone ,然后 update 到字典中 |
34
sparklee 255 天前
单个线程更新, 所有需要更新的操作都做成 任务 都放到任务队列
|
35
yansideyu 255 天前
楼主的问题是所有线程更新数据的时候,需要更新多个属性,怎么避免没有全部更新完的情况下,其他线程读取了数据。拿到了脏数据?
|
36
i8086 255 天前
楼主意思应该是多线程更新集合里 Device 类型属性值的问题?
用 volatile 就好了,目前是最方便。 |
37
qping 255 天前
又仔细看了下,你是多线程写啊,MonitorThread (多个)和 Thread0 都能更新, 那存在一些问题
1. MonitorThread 和 Thread0 是否会写入冲突 如果 MonitorThread 和 Thread0 写入相同得内存,那感觉就是设计有问题 那我假想他们不会冲突 2. 多个 MonitorThread 冲突的问题 多个 MonitorThread 每次都更新全部的 devices ,这个设计也很奇怪 假设已经做到通过锁或其他手段,保证一个 MonitorThread 更新是原子级别的。 MonitorThread A 先启动,MonitorThread B 后启动,因为等待时间长 A 的结果却比 B 后写入,这样没有问题吗? 我觉得,应该可以有多个 MonitorThread 线程,但是每个 Device 只能同一时间被一个 MonitorThread 更新 实现方法上,可以用队列,每次更新 MonitorThread 从队列中取一个 Device ,如果更新完重新还回 |
38
yicong135 255 天前
ReaderWriterLockSlim 读写锁
https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.readerwriterlockslim?view=net-8.0 |
39
shapper 255 天前
task 本身就是开新线程,减少锁粒度,锁 devices 就可以,把具体 device 分配到 task ,task 只修改自己引用的 device ,不修改 devices ;
|
40
dogfeet 255 天前
@bthulu 看起来就是写不依赖读,或者说写需要的读状态可以是旧数据(只需完整,无需最新)。那么单纯的将 Device 变为不可变就行。ConcurrentDictionary 单纯的读写本身是原子的,查了一下,不可变的线程安全 C# 与 Java 是一致的。
|
41
nevermoreluo 255 天前
除了 Group 都是 int 或者 bool ,Group 不动的话 保证原子性应该就好了吧....
|
43
xumng123 255 天前 via iPhone
已经是安全的了
|
44
bthulu OP @qping Thread0 会不依赖原有属性值更新所有 Device 的属性. MonitorThread 会读大部分的 devices, 并更新小部分 devices.
|
45
m2276699 254 天前
这样的业务应该用事件驱动
|
46
johnnyyeen 254 天前
1 生产者对多消费者,给每个消费者一个队列。
通过原子操作(信号或者锁)的方式保护生产者与消费者的竞争条件(我写数据你取走数据)。 |