如何在nifi中从getfilespark处理器读取文件

rnmwe5a2  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(601)

下面是我的流程:

GetFile > ExecuteSparkInteractive > PutFile

我想从中读取文件 GetFile 处理器输入 ExecuteSparkInteractive 处理器,应用一些转换并将其放置在某个位置。下面是我的流程

我写道 spark scala code 低于 code Spark处理机部分:

val sc1=sc.textFile("local_path")
sc1.foreach(println)

流中没有发生任何事情。那么我如何使用getfile处理器读取spark处理器中的文件呢。
第二部分:
我试过下面的流程只是为了练习:

ExecuteScript > PutFile > LogMessage

我在executescript processor中提到了以下代码:

readFile = open("/home/cloudera/Desktop/sample/data","r")
for line in readFile:
    lines = line.strip()
    finalline = re.sub(pattern='((?<=[0-9])[0-9]|(?<=\.)[0-9])',repl='X',string=lines)
readFile = open("/home/cloudera/Desktop/sample/data","w")
readFile.write(finalline)

代码工作正常,但不会将格式化数据写入目标文件夹。所以我在这里哪里出错了。另外,我在本地机器上安装了pandas,并从executescript处理器运行pandas代码,但nifi不读取pandas模块。为什么会这样?我尽力了。此外,我找不到任何相关的链接,我可以得到这个基本流

p1tboqfb

p1tboqfb1#

其实不是这样的。。。getfile正在提取nifi节点的本地文件,并将它们带到nifi流中进行处理。executesparkinteractive启动远程spark群集上的spark作业,它不会将数据传输到spark。所以你可能想把数据放在spark可以访问的地方,比如getfile->puthdfs->executesparkinteractive。

相关问题