pyspark和hadoop的逐帧视频处理

7eumitmz  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(470)

我想用spark和hadoop并行处理mp4视频帧。我不想在处理前提取所有帧。我要寻找的是一种方法,通过视频时间按顺序读取帧数据,然后在使用yarn的hadoop集群上的spark执行器中输入帧。mp4视频文件可以位于本地文件系统或hdfs上。
我可以使用ffmpeg创建一个管道,然后读取原始帧字节(例如,image=np.fromstring(pipe.stdout.read(192010803),dtype='uint8'))。是否有任何方法将数据(即,流,帧作为可变解码时间的函数传入)馈送到spark rdd,并且具有执行某些操作(例如,计算平均强度)的map函数?
我已经阅读spark文档很长一段时间了,在这个场景中找不到任何有用的东西。我可能因为树木而错过了森林。如果可以,请提供帮助,即使不使用ffmpeg和管道。

imzjd6km

imzjd6km1#

经过反复试验,我找到了一个可行的解决办法。虽然这可能不适用于所有人,但也可能对某些人有所帮助,因此:
我首先创建了一个从视频中提取帧的脚本,该脚本必须存在于所有工作节点上:


# !/home/hadoop/anaconda2/bin/python

import os
import sys
import subprocess as sp
import numpy as np
import cv2
import copy

# RDD.pipe sends via stdin

i = 0
try:
        i = input()
except:
    sys.exit()

file_name = 'v.mp4'
FFMPEG_BIN = "ffmpeg" # on Linux ans Mac OS
command = [ FFMPEG_BIN,
               '-i', '/home/hadoop/' + file_name,
               '-f', 'image2pipe',
               '-vf', 'select=gte(n\, %d)' % i,
               '-vframes', '1',
               '-pix_fmt', 'rgb24',
               '-loglevel', 'panic',
               '-vcodec', 'png', '-']
pipe = sp.Popen(command, stdout=sp.PIPE, bufsize=10**8)
data = pipe.stdout.read()
pipe.stdout.close()
import base64
print(base64.b64encode(data))

然后,在pyspark脚本中,我使用脚本参数创建rdd:

params = [str(i)  for i in range(1, 1001)]
rdd1 = sc.parallelize(params, numSlices=1000)
pipeRDD = rdd1.pipe('/home/hadoop/src/extract_frame.sh')
resizedRDD = pipeRDD.map(resizeMapper)
test = resizedRDD.collect()

测试现在有前1000帧。“调整大小”Map器调整每个帧的大小,如下所示:

def resizeMapper(x):
    import base64
    import cv2
    a = base64.b64decode(x)
    im = cv2.imdecode(np.fromstring(a, dtype=np.uint8), 1)
    im = cv2.resize(im, (200, 200))
    return im

我希望这能帮助别人。

相关问题