手头上有几十个 rtsp 链接,需要接受所有的 rtsp 视频流,并进行相应的处理。现有是使用 ffmpeg-python 库来接受 rtsp 流。 由于没有什么好的方法,所以只能用 python 的 Process 对象来管理。
class RTSPStream(Process):
def __init__(self, queue, stream_id, stream_url):
Process.__init__(self)
self.q = q
self.stream_id = stream_id
self.stream_url = stream_url
def get_hnw(self, url):
probe = ffmpeg.probe(url, rtsp_transport='tcp')
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
width = video_stream['width']
height = video_stream['height']
return height, width
def run(self):
h, w = get_hnw(self.stream_url)
assert h != 0
assert w != 0
out = (
ffmpeg
.input(url, rtsp_transport='tcp')
.output('pipe:', format='rawvideo', pix_fmt='bgr24', loglevel="quiet", r=1)
.run_async(pipe_stdout=True)
)
cnt_empty = 0
while True:
in_bytes = out.stdout.read(h*w*3)
if not in_bytes:
cnt_empty += 1
if cnt_empty > 10:
break
sleep(0.1)
continue
cnt_empty = 0
frame = np.frombuffer(in_bytes, dtype=np.uint8).reshape(h, w, 3)
data = {
'stream_id': self.stream_id,
'origin': frame
}
if not queue.full():
self.queue.put(data)
class DoSth(Process):
def __init__(self, queue):
Process.__init__(self)
self.queue = queue
def run(self):
while True:
if queue.empty():
sleep(0.05)
continue
data = queue.get()
# do sth to ndarray
我会初始化一个 DoSth 进程对象,并根据不同的 rtsp 链接初始化多个 RTSPStream 进程对象。两种进程对象之间通过进程安全的 queue 连接。
这种方法,不太优雅,但没有 在处理 25 个 rtsp 流的时候还能正常运行,但当数量增加到 28 左右时,就无法达到预期:CPU 爆增至 100%、内存无限制增长……
所以,我应该用什么方法来同时接受这么多个 RTSP 视频流呢?
1
eojessie 2020-07-22 15:39:27 +08:00
楼主有好的方法管理多个 ff 进程了么?
|
2
mrhhhdx 2022-09-09 23:08:41 +08:00
减小 buffer 或者直接 nobuffer ,以及每一帧用完使用 del( ) ,或许能改善内存增长问题
|