最近在做关于实时日志预警方面的内容,其中一个重要需求就是从日志文件中实时向 Kafka 写。所以随着日志文件的实时增加,如何才能做到 准确并完整地写入呢?下面几个是最近写的一些内容,和大家讨论一下哈:
① 直接 tailf -F | ./kafka-producer.sh ,但因为需要对生成的日志文件进行一些处理,并且用 shell 的话不方便维护,所以弃掉。
接下来的几种方法里采用的是 pykafka 作为客户端:
② 用 subprocess.Popen
调用 tail -F 命令。但 tail -F 和 wc -l / ls 等一次性命令不同,在设置了 stdout=PIPE 的情况下,貌似无论是用 communicate() 还是 read() ,都无法
做到随着日志文件的增加来显示出更新的内容。。。所以也弃掉。
③ 用 Popen 来调用 wc -l
命令,同时定义一个 global pre_line_count 变量,每次将最新读取到的行数和 global pre_line_count 做比较(同时更新 pre_line_count ),用 f.seek()
的方法读取行数。这种方法虽然也能运行,但运行效率太低,有些部分太 tricky 了。
④ 最后采用的是这种方法,直接用 shell 执行 tail -F logfile | nc 7777
,用 netcat 工具把内容通过 7777 端口发过去,然后 Python 再启一个 server 端,用 socket 来接受 7777 端口传过来的内容。测试下来这种方法好像还不错。。。⊙﹏⊙b 汗 估计就要用这种方法了,再加上一个 supervisor 做守护。
Google 上搜出来的方法还有一种是用 logtash ,用的是写的一个 Nginx module ,但我们要处理的不仅仅是 Nginx 的日志文件。。。还有许多其他类型的,所以这种方法也只能弃掉。
呜呜,不知道大家写的时候都有什么好的办法嘛??。。
1
huangzxx 2016-10-24 10:38:56 +08:00 1
ELK stack 有一整套的,抓文件的 filebeat ,抓网络的 packetbeat ,可以看看。
logstash 支持的 input 很多,应该满足你的需求。 |
2
Allianzcortex OP @huangzxx 汗。。应该是 logstash 的。之前也想过这个,但做这个定位希望是轻量级,可扩展的~~ ELK 一系列产品 JVM 系有些大材小用的感觉。。至今对 Elastic 的内存占用心有余悸。。。
|
3
huangzxx 2016-10-24 11:36:01 +08:00
beat 系列是 go 的
|
4
iyaozhen 2016-10-24 11:50:00 +08:00 via Android 1
elk 有一整套。
推荐使用 filebeat(线上日志收集)+logstash(线下接收集群)。线下的 logstash 主要做负载均衡还有一点简单的逻辑,比如不同类型日志放不同 kafka topic 。 你说的 Python subprocess tail -f 也可以,但是需要处理负载均衡,断线重连,比较麻烦。之前写了个 Python 版的 filebeat ,仅供参考: https://github.com/iyaozhen/filebeat.py 已经小范围用在生产环境了。代码还有个 v2 的分支,能同时处理多个文件。 |