var fmutex = sync.Mutex{}
var A = make(chan string, 1048576)
var B = make([]flowStatistic,0)
func foo(){
go getData()
go txData()
go handleData()
}
//接受数据
func getData(){
for {
...
data := conn.Read()
A<-data
}
}
func txData(){
for{
var fs flowStatistic
err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析A传过来的数据
...
B = append(B,fs) //仅仅在这里会插入B
}
}
func handleData(){
//这里每5秒钟对B中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住B,处理完后清空B中数据并解锁
for{
time.Sleep(5 * time.Second)
fmutex.Lock()
...
B = make([]flowStatistic,0)
fmutex.Unlock()
}
}
有老哥提到了append B的时候应该锁住,我试了下发现实际上还是会丢数据。
var fmutex = sync.Mutex{}
var A = make(chan string, 1048576)
var B = make([]flowStatistic,0)
func foo(){
go getData()
go txData()
go handleData()
}
//接受数据
func getData(){
for {
...
addr, err := net.ResolveUnixAddr("unixgram", sock)
conn, err := net.ListenUnixgram("unixgram", addr)
data := conn.ReadFromUnix()
A<-data
}
}
func txData(){
for{
var fs flowStatistic
err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析A传过来的数据
...
fmutex.Lock()
B = append(B,fs)
fmutex.Unlock()
}
}
func handleData(){
//这里每5秒钟对B中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住B,处理完后清空B中数据并解锁
for{
time.Sleep(5 * time.Second)
fmutex.Lock()
...
B = make([]flowStatistic,0)
fmutex.Unlock()
}
}
改进了一下
1
RH 2023-05-08 07:58:19 +08:00
需要 demo 才能分析,你描述的逻辑里有很多不确定性。
|
2
lrh3321 2023-05-08 08:18:57 +08:00 via Android
要 demo, 你两个协程都会写切片 B
|
4
piaodazhu 2023-05-08 08:54:24 +08:00 1
我就提一个可能,切片 B 扩容,导致这种特殊情况:
时刻 1 ,goroutine1 加锁,用 B=append(B, item)向切片 B 追加一个元素。刚好触发了扩容,B 的底层数组指针发生了转移。即,append 的参数 B 和返回值 B 中的 ptr 不同。 时刻 2 紧接着时刻 1 ,goroutine2 拿到锁,这个时候在 goroutine2 看来,B 只是一个由(size,cap,ptr)构成结构体,它察觉不到 B 底层数组指针的变化,所以看不到 goroutine1 追加的数据。 具体可以检查一下代码。 |
5
xuboying 2023-05-08 09:04:24 +08:00
我感觉 sync.Pool 是干这个事情的。但是我一直没有掌握 sync.Pool 的正确用法,希望有大佬解释一下。
|
6
ding2dong 2023-05-08 09:04:28 +08:00
调大 A 的 bufsize
|
7
ding2dong 2023-05-08 09:05:47 +08:00
另外写入 B 的时候也要 mutex ,否则会被污染
|
8
lysS 2023-05-08 09:07:39 +08:00
“但是现在会出现丢数据的情况” 这是为什么呢?实际没有从数据源中获取到 1w 条?
|
9
liangkang1436 2023-05-08 09:08:47 +08:00 via Android
有没有考虑用时序数据库来存储这些数据然后订阅? 1w/s ,这个数据量不小了
|
10
fregie 2023-05-08 09:21:16 +08:00 via Android
2 中存入 b 的过程也要锁。
其实这里不用切片用队列比较合适 |
11
ghost024 2023-05-08 09:21:48 +08:00
没看到你的代码,粗略的分析,你的第一个协程,从 channelA 中写到切片 b 也需要先获得 b 的 mutex 锁的,要不然,如果在锁 b 的时候你从 channelA 中获取数据,因为 b 锁住了,你写不进去就丢了
|
12
WispZhan 2023-05-08 09:28:28 +08:00 1
The Golden Rule - Don't Block the Event Loop or Coroutine.
|
13
Martens 2023-05-08 09:32:34 +08:00
写切片 B 和读切片 B 的时候都要加锁
|
14
dode 2023-05-08 09:40:47 +08:00
先放 kafka ,再批量读出来处理呢
|
15
8355 2023-05-08 09:42:27 +08:00
|
16
joesonw 2023-05-08 09:52:00 +08:00 via iPhone
能确定数量就用 channel ,不行的话用 linked list 。尽量避免用锁,传递锁的时候要传指针&。
|
17
matrix1010 2023-05-08 10:09:41 +08:00
丢数据算 bug 吗? 如果算请写个并发的单元测试并加上-race 测一下
|
18
Nazz 2023-05-08 10:19:36 +08:00
数据量有点大, 建议使用 sync.Pool + 任务队列
|
20
swqslwl OP |
21
leonshaw 2023-05-08 11:24:30 +08:00
conn 是什么协议?
把加放锁和处理数据的位置再标一下。 |
22
rrfeng 2023-05-08 11:37:29 +08:00
channel 里读 N 条出来直接处理掉,不要用切片缓存 /交互数据,就没这个问题了。
这个切片设计的根本没什么道理。 |
23
pkoukk 2023-05-08 11:39:19 +08:00
没理由 append B 加锁了还能丢数据啊
你可能丢数据的地方在 err := json.Unmarshal([]byte(<-A), &fs) |
24
oldshensheep 2023-05-08 11:53:53 +08:00 via Android
看你最终的代码感觉没什么问题。
建议写个可以复现的 demo ,之前我也是出 bug ,感觉是用的第三方的库的问题。后来写了个可以复现的 demo ,发现是我代码的问题。 我有很多莫名其妙的 bug 都是在写 demo 的时候发现代码真正错误的地方。 比如说你这个代码,里面有网络连接,写数据库啥的,都给简化了,最终就是纯粹的逻辑代码,慢慢调试就发现问题了。 而且也方便别人运行调试。 |
25
ns09005264 2023-05-08 11:55:40 +08:00
handleData 里加锁处理数据,但是 txData 里 append 却没有加锁,
所以当 handleData 正在处理数据的时候,txData 还在往里面 append 数据, 等 handleData 处理完,清空了 B ,txData 在 handleData 处理数据的过程中所添加的数据也就被清除了。 没有给写入加锁只给读取加锁,等于没加锁。 另外你想用 handleData 异步处理数据,但是如果在 txData 里给 append 加锁,其实就等于同步处理数据了,没什么意义。考虑在 txData 里对数据进行分块或按时间进行分块,再将分块的数据传给 handleData ,连锁都不用。 |
26
8355 2023-05-08 12:08:29 +08:00
我的理解 handleData 这里完全没必要 也没必要用锁
可以把写库代码直接放到 appendB = append(B,fs) 位置执行 其次 db 本身是支持并发写库的,这里加锁意义不大,加了锁也都是在等待锁反而更慢 |
27
leonshaw 2023-05-08 13:47:13 +08:00
检查一下发送端的返回值。
如上面所说的,这样实现并没有并发。如果处理能力大于上游,同步处理就行;如果小于上游,最终结果就是一个协程在处理,一个在等锁,一个在等 channel 缓冲空间。 |
28
reliefe 2023-05-08 14:25:00 +08:00
这个问题根本应该在于多个线程操作同一个切片导致的,这里就会有很大不确定性。我问了 GPT-4 ,它给了很好的建议,把 B 换成 chan 而不是切片试试
``` var A = make(chan string, 1048576) var B = make(chan flowStatistic, 1048576) // 使用带缓冲的 channel 而非切片 ... func txData() { for { var fs flowStatistic err := json.Unmarshal([]byte(<-A), &fs) ... B <- fs // 将 fs 传递给 handleData } } func handleData() { var buffer []flowStatistic timer := time.NewTimer(5 * time.Second) for { select { case fs := <-B: buffer = append(buffer, fs) case <-timer.C: // 处理 buffer 中的数据 ... buffer = make([]flowStatistic, 0) timer.Reset(5 * time.Second) } } } ``` 完整回复: https://flowus.cn/share/533684c0-2869-4507-8375-297103f09c77 PS: 顺便一提在我的小站就可以随时用 GPT-4 了, liaobots.com |
29
quzard 2023-05-08 14:39:20 +08:00
```go
var fmutex = sync.Mutex{} var A = make(chan string, 1048576) var B = sync.Pool{ New: func() interface{} { return make([]flowStatistic, 0, 10000) }, } func foo(){ go getData() go txData() go handleData() } // 接收数据 func getData(){ for { // ... data := conn.Read() A<-data } } func txData() { for { var fs flowStatistic err := json.Unmarshal([]byte(<-A), &fs) // ... fmutex.Lock() currB := B.Get().([]flowStatistic) currB = append(currB, fs) B.Put(currB) fmutex.Unlock() } } func handleData() { for { time.Sleep(5 * time.Second) fmutex.Lock() currB := B.Get().([]flowStatistic) // 进行数据聚合和存储操作 // ... // 清空 B currB = currB[:0] B.Put(currB) fmutex.Unlock() } } ``` |
31
Anivial 2023-05-08 15:21:25 +08:00
感觉可以换一种思路,通过 time.Ticker 和 select 来代替锁保证缓存数据不会被互相抢占影响
for { select { case data := <-A: ... B = append(B,fs) case t := <-ticker.C: // ticker := time.NewTicker(5 * time.Second) // 聚合处理数据 process(B) // 清空 B 保留容量 B = B[:0:cap(B)] } } |
32
piaodazhu 2023-05-08 15:28:16 +08:00
在楼主给的第二份代码其实也没有解决上面我提的那个问题,因为 goroutine1 在等待 goroutine2 放锁的时候,它栈里面的变量 B 就是旧的 B (底层指针不会变成你清空后新赋值的指针),所以 goroutine2 的清空操作 goroutine1 在这一次执行中是不可见的。
试试这样修改? ``` var fmutex = sync.Mutex{} var A = make(chan string, 1048576) var B_array = make([]flowStatistic,0) // <------ var B = &B // <------ func foo(){ go getData() go txData() go handleData() } //接受数据 func getData(){ for { ... addr, err := net.ResolveUnixAddr("unixgram", sock) conn, err := net.ListenUnixgram("unixgram", addr) data := conn.ReadFromUnix() A<-data } } func txData(){ for{ var fs flowStatistic err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析 A 传过来的数据 ... fmutex.Lock() *B = append(*B,fs) // <------ fmutex.Unlock() } } func handleData(){ //这里每 5 秒钟对 B 中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住 B ,处理完后清空 B 中数据并解锁 for{ time.Sleep(5 * time.Second) fmutex.Lock() ... *B = make([]flowStatistic,0) // <------ fmutex.Unlock() } } ``` 感觉大概率是这里的问题 |
33
piaodazhu 2023-05-08 15:43:16 +08:00
@piaodazhu 不好意思看错了,B 不在栈上,上面这个请忽略。。。
另外,在 handleData()里面,可以在加锁之后: fmutex.Lock() tmp := B B = make([]flowStatistic, 0) fmutex.Unlock() ... // processing tmp 可以减少加锁时间,看不能减少或者消除数据丢失? |
34
PythonYXY 2023-05-08 16:05:37 +08:00
数据量也不小了,感觉还是上 Flink 吧,基于滚动窗口+RocksDB 状态后端做实时分析。
|
35
picone 2023-05-08 16:44:18 +08:00
感觉代码没有问题,但是有些能优化的地方,可以改成无锁化
```go func txData() { ticker := time.NewTicker() for { select { case <- ticker.C: go func() // report your data B = make() case evt <- A: B = append(B, evet) case <-ctx.Done(): return } } } ``` |
36
liuxu 2023-05-08 17:19:19 +08:00
第二条附言的代码应该没问题了,golang 所有基础类型都不是线程安全的,txData()在不断自动扩容 B ,而 handleData()拿到的是旧指针,处理完旧指针的数据清空新 B 指针,导致了旧指针和新 B 指针这段时间 append()的数据丢失
第一个附言等于没锁,handleData()内部没有线程安全问题,是单线程的,竞态出在 txData()的 append()和 handleData()的 B = make([]flowStatistic,0)之间 |
37
ccde8259 2023-05-08 20:33:03 +08:00 via iPhone
这种地方 mutex 写 slice 不如写 chan……
|
38
doraf 2023-05-09 09:43:00 +08:00
如果还有问题,能不能试试 atomic.Value 来存取 B 。
txData 和 handleData 之间,能不能使用 chan 来传递 flowStatistic 。 5 秒处理一次的话,在 txData 缓存数据,每 5 秒调用一次 go handleData 行不行(传递缓存数据给 handleData ),不知道语义还对不对。 要不要考虑考虑 kafka 、flink 这种。 |
39
xurh 2023-05-09 11:03:17 +08:00
我之前做爬虫收集数据也遇到过类似的问题,把数据聚合进行批量插入减少 io 。
我采用的 chan ,然后启动一个协程监听 chan ,当收集一定数量的数据或者时间满足,就把数据写入 db ```go type DBWriter[T any] struct { Size int Interval time.Duration done chan struct{} ch chan T insertDB func([]T, int) } func (w *DBWriter[T]) Start() { ticker := time.NewTicker(w.Interval) records := make([]T, 0, w.Size) insert := func() { if len(records) == 0 { return } w.insertDB(records, w.Size) records = make([]T, 0, w.Size) } for { select { case <-w.done: insert() return case <-ticker.C: insert() case data := <-w.ch: records = append(records, data) if len(records) == w.Size { insert() } } } } ``` |