hadoop—是否可以使用ApacheSpark读取pdf/音频/视频文件(非结构化数据)?

bqf10yzr  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(378)

是否可以使用apachespark读取pdf/音频/视频文件(非结构化数据)?例如,我有数千张pdf发票,我想从中读取数据并对其进行分析。我必须执行哪些步骤来处理非结构化数据?

z6psavjg

z6psavjg1#

是的,是的。使用 sparkContext.binaryFiles 以二进制格式加载文件,然后使用 map 将值Map到其他格式—例如,使用ApacheTika或ApachePOI解析二进制文件。
伪代码:

val rawFile = sparkContext.binaryFiles(...
val ready = rawFile.map ( here parsing with other framework

重要的是,解析必须使用前面我的答案中提到的其他框架来完成。map将获取inputstream作为参数

yqkkidmi

yqkkidmi2#

我们有一个需要对输入文件使用自定义解密算法的场景。我们不想用scala或python重写这些代码。python spark代码如下:

from pyspark import SparkContext, SparkConf, HiveContext, AccumulatorParam

def decryptUncompressAndParseFile(filePathAndContents):
    '''each line of the file becomes an RDD record'''
    global acc_errCount, acc_errLog
    proc = subprocess.Popen(['custom_decrypt_program','--decrypt'], 
             stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (unzippedData, err) = proc.communicate(input=filePathAndContents[1])
    if len(err) > 0:  # problem reading the file
        acc_errCount.add(1)
        acc_errLog.add('Error: '+str(err)+' in file: '+filePathAndContents[0]+
            ', on host: '+ socket.gethostname()+' return code:'+str(returnCode))
        return []  # this is okay with flatMap
    records   = list()
    iterLines = iter(unzippedData.splitlines())
    for line in iterLines:
        #sys.stderr.write('Line: '+str(line)+'\n')
        values = [x.strip() for x in line.split('|')]
        ...
        records.append( (... extract data as appropriate from values into this tuple ...) )
    return records

class StringAccumulator(AccumulatorParam):
    ''' custom accumulator to holds strings '''
    def zero(self,initValue=""):
        return initValue
    def addInPlace(self,str1,str2):
        return str1.strip()+'\n'+str2.strip()

def main():
    ...
    global acc_errCount, acc_errLog
    acc_errCount  = sc.accumulator(0)
    acc_errLog    = sc.accumulator('',StringAccumulator())
    binaryFileTup = sc.binaryFiles(args.inputDir)
    # use flatMap instead of map, to handle corrupt files
    linesRdd = binaryFileTup.flatMap(decryptUncompressAndParseFile, True)
    df = sqlContext.createDataFrame(linesRdd, ourSchema())
    df.registerTempTable("dataTable")
    ...

自定义字符串累加器在识别损坏的输入文件时非常有用。

相关问题