func (a *Agent) readFileEvents(writer *io.PipeWriter) {
defer writer.Close()
w := bufio.NewWriter(writer)
defer w.Flush()
ioBuff := make([]byte, 131072)
log := a.logger.WithField("component", "readFileEvents")
var buff bytes.Buffer
vw, err := bsonrw.NewBSONValueWriter(&buff)
if err != nil {
a.logger.WithFields(logrus.Fields{
"error": err.Error(),
}).Error("Failed to create bson value writer")
return
}
encoder, err := bson.NewEncoder(vw)
if err != nil {
a.logger.WithFields(logrus.Fields{
"error": err.Error(),
}).Error("Failed to create bson encoder")
return
}
var gzBuff bytes.Buffer
zw, _ := flate.NewWriter(&gzBuff, 3)
handleEvent := func(event *fsnotify.Event) {
a.logger.Debug("handle event")
gzBuff.Reset()
zw.Reset(&gzBuff)
var entry *EventInfo
a.metrics.eventsCounter.WithLabelValues(event.Op.String()).Inc()
log.WithFields(logrus.Fields{
"filename": event.Name,
"type": event.Op.String(),
}).Debug("event")
defer buff.Reset()
fileInfo, err := os.Stat(event.Name)
if err != nil {
log.WithFields(logrus.Fields{
"filename": event.Name,
"error": err.Error(),
}).Debug("Failed to get file info")
return
}
if fileInfo.Mode()&fs.ModeSymlink == fs.ModeSymlink {
return
}
result := a.fileFilter(event.Name)
if !result {
return
}
file, err := os.Open(event.Name)
if err != nil {
log.WithFields(logrus.Fields{
"error": err.Error(),
"filename": event.Name,
}).Error("Failed to open file")
return
}
defer file.Close()
entry = eventInfoPool.Get().(*EventInfo)
defer eventInfoPool.Put(entry)
entry.ClientId = a.uuid
entry.FilePath = event.Name
if _, err = io.CopyBuffer(zw, file, ioBuff); err != nil {
a.logger.WithFields(logrus.Fields{
"error": err.Error(),
"filename": event.Name,
}).Error("Failed to read file")
}
if err := zw.Close(); err != nil {
log.WithFields(logrus.Fields{
"error": err.Error(),
}).Error("Failed to compress file")
return
}
entry.FileContent = gzBuff.Bytes()
err = encoder.Encode(entry)
if err != nil {
a.logger.WithFields(logrus.Fields{
"error": err.Error(),
"filepath": entry.FilePath,
}).Error("Failed to serialize object")
return
}
if _, err = w.Write(buff.Bytes()); err != nil {
a.logger.WithFields(logrus.Fields{
"error": err.Error(),
}).Error("Failed to transfer data")
return
}
a.metrics.eventsSentCounter.WithLabelValues(event.Op.String()).Inc()
a.metrics.fileCounter.Inc()
}
for {
select {
case event, ok := <-a.watcherChan:
if !ok {
return
}
handleEvent(&event)
case <-a.ctx.Done():
return
}
}
}
使用 pprof 发现 flate.NewWriter 方法消耗了大量内存。
这张图是 inuse_space:
这张图是 alloc_space:
这张是 ReadDir:
顺便吐槽下 os.(*File).ReadDir 内存占用也很离谱。 有没有大佬提供下优化思路 qaq
1
darrh00 2021-09-09 20:52:44 +08:00
大致看了一下,你这里的 entry 完全没有必要用 sync.Pool 池吧,应该可以直接分配在栈上,不会有什么 GC 压力,反而你这个 EventInfo 带有个大的属性 FileContent 的引用,你把 entry 放进池中的时候并没有把 FileContent 引用清掉,导致放进池中的 entry 仍旧引用着 FileContent 这种可能很大的[]byte, 导致 GC 无法回收。
|
2
lysS 2021-09-09 21:14:38 +08:00
这个 profile 不是 CPU 的图嘛?
|
3
Nitroethane OP |
4
darrh00 2021-09-09 22:49:11 +08:00
@Nitroethane
buff.Reset 是复位自己啊,FileContent 又不会 reset,看看以下会输出什么? func f() { var buf bytes.Buffer io.WriteString(&buf, "Hello") r := buf.Bytes() buf.Reset() fmt.Println(string(r)) } |
5
Nitroethane OP @darrh00 #4 我这句话是有问题…… 不过不用 sync.Pool 的话问题还存在……
|
6
GopherDaily 2021-09-11 00:44:54 +08:00
@Nitroethane 你的 readFileEvents 是不是被调用了很多次,导致 writer 被初始化了很多次
|
7
Nitroethane OP @GopherDaily #6 不是,readFileEvents 方法是长期运行的,被调用很多次的是 handleEvent 函数。
其实问题出在 gzBuff 和 zw 两个变量上。这两个变量的生命周期和 readFileEvents 方法是相同的。假如随着程序的运行,要处理的文件越来越大,那么 gzBuff 和 zw 这两个变量的底层 byte slice 也会越来越大,而且不会被 GC 回收,byte slice 也不会自动收缩。所以随着运行时间内存使用量会持续增长。 |
8
Nitroethane OP |
9
Nitroethane OP 其实最优解应该是根据文件大小选择最合适的 bytes.Buffer,但是 sync.Pool 不支持这种操作。如果自己手动先 get,判断 buffer 大小再 put 的话,感觉会影响 GC 导致更严重的性能问题
|
10
Nitroethane OP @Nitroethane #8 突然发现有个致命 bug,不应该在 compressFile 函数里就把 buff 给 Put,应该在 handleEvent 的 return 语句前面用 defer 给 Put 掉
|
11
Nitroethane OP |
12
Nitroethane OP |