opencv 使用OpevCV VideoWriter实时录制视频

v9tzhpje  于 2023-10-24  发布在  其他
关注(0)|答案(2)|浏览(160)

我有几个OpenCV项目,通过USB分析视频,在某些情况下必须将视频流记录到一个文件中。使用我的软件的人抱怨说,10分钟以上的记录产生的视频文件比他们应该长20秒左右。
我正在使用openCV的VideoWriter。我尝试了将CV2_CAP_PROP_FPS设置为非常低的设置,并尝试获得几秒钟内的平均帧速率,以找到适合我的输出文件帧速率的好设置。仍然不够接近我需要的真实的时间。
有没有人知道一个好的方法来确保我的视频记录接近真实的时间?我应该使用类似time.sleep(在python中)的东西来限制我的帧率?或者有更好的方法来做到这一点?

0ve6wy6x

0ve6wy6x1#

我写了一个实时跟踪器(用于神经生物学研究),我用它与许多网络摄像头,并注意到我的一些相机不坚持帧速率非常精确;它们可能有点快或慢。为了保存“合理正确”的视频,从网络摄像头捕获帧的代码调用我的VideoWriterwrite(frame)方法,该方法将帧放入队列中,并且帧由以恒定速率写入帧的单独的“写入器”线程检索。如果写入器线程在要写入下一帧时发现队列为空,则它重复最后一帧。如果写入器线程在队列中发现多于一个帧,它会写入最近的帧并丢弃其他帧。(可以在没有单独线程的情况下实现这一点,但对于实时跟踪器,让write(frame)快速返回是很好的。)
下面是我的代码摘录。它们不会逐字运行,但会显示我刚才描述的内容。(我计划在接下来的几周内将实时跟踪器放在GitHub上。)

class VideoWriter:

  def __init__(self, fn=None, fcc='MJPG', fps=7.5):
    ...
    self.fcc, self.fps, self.dt = fcc, fps, 1./fps
    self.q, self._stop, self.n = Queue.Queue(), False, 0
    self.wrtr = threading.Thread(target=self._writer)
    self.wrtr.start()

  def _writer(self):
    frm = firstTime = vw = None
    while True:
      if self._stop:
        break
      # get most recent frame
      while not self.q.empty():
        frm = self.q.get_nowait()
      if frm is not None:
        if vw is None:
          vw = cv2.VideoWriter(self.fn+self._EXT, cvFourcc(self.fcc),
            self.fps, imgSize(frm), isColor=numChannels(frm)>1)
          firstTime = time.time()
        vw.write(frm)
        self.n += 1
      dt = self.dt if firstTime is None else max(0,
        firstTime + self.n*self.dt - time.time())
      time.sleep(dt)

  # write frame; can be called at rate different from fps
  def write(self, frm):
    self.q.put(frm)
9fkzdhlc

9fkzdhlc2#

我更新了the code by Ulrich Stern,使它适用于我的OAK D相机,几乎不跳帧。他提到,代码不应该立即工作,但我仍然想指出,我们需要.release()cv2.VideoWriter得到完整的视频文件。

# https://stackoverflow.com/questions/35370360/recording-video-in-real-time-with-opevcv-videowriter
import threading
import cv2
import queue
import time

class ReasonableVideoWriter:

    def __init__(self, fn=None, fcc='MJPG', fps=30, res=(1280, 720)):
        self.fn = fn
        self.fcc, self.fps, self.dt = fcc, fps, 1./fps
        self.res = res
        self.q, self._stop, self.n = queue.Queue(), False, 0
        self.wrtr = threading.Thread(target=self._writer)
        self.wrtr.setDaemon(True)   # make the thread daemonic
        self.wrtr.start()
    

    def _writer(self):
        frm = firstTime = vw = None
        while True:
            if self._stop:
                self._stop = not self._stop
                break
            # get most recent frame
            while not self.q.empty():
                frm = self.q.get_nowait()

            if frm is not None:
                if vw is None:
                    vw = cv2.VideoWriter(self.fn, cv2.VideoWriter_fourcc(*self.fcc),
                        self.fps, self.res)
                    firstTime = time.time()
                vw.write(frm)
                self.n += 1

            dt = self.dt if firstTime is None else max(0,
                firstTime + self.n*self.dt - time.time())
            time.sleep(dt)

        vw.release()    # release the video file

    # write frame; can be called at rate different from fps
    def write(self, frm):
        self.q.put(frm)
    
    # stop
    def stop(self):
        self._stop = True

创建一个ReasonableVideoWriter对象,并使用.write(frame).stop()方法进行记录。

相关问题