下面是我的流程:
GetFile > ExecuteSparkInteractive > PutFile
我想从中读取文件 GetFile
处理器输入 ExecuteSparkInteractive
处理器,应用一些转换并将其放置在某个位置。下面是我的流程
我写道 spark scala code
低于 code
Spark处理机部分:
val sc1=sc.textFile("local_path")
sc1.foreach(println)
流中没有发生任何事情。那么我如何使用getfile处理器读取spark处理器中的文件呢。
第二部分:
我试过下面的流程只是为了练习:
ExecuteScript > PutFile > LogMessage
我在executescript processor中提到了以下代码:
readFile = open("/home/cloudera/Desktop/sample/data","r")
for line in readFile:
lines = line.strip()
finalline = re.sub(pattern='((?<=[0-9])[0-9]|(?<=\.)[0-9])',repl='X',string=lines)
readFile = open("/home/cloudera/Desktop/sample/data","w")
readFile.write(finalline)
代码工作正常,但不会将格式化数据写入目标文件夹。所以我在这里哪里出错了。另外,我在本地机器上安装了pandas,并从executescript处理器运行pandas代码,但nifi不读取pandas模块。为什么会这样?我尽力了。此外,我找不到任何相关的链接,我可以得到这个基本流
1条答案
按热度按时间p1tboqfb1#
其实不是这样的。。。getfile正在提取nifi节点的本地文件,并将它们带到nifi流中进行处理。executesparkinteractive启动远程spark群集上的spark作业,它不会将数据传输到spark。所以你可能想把数据放在spark可以访问的地方,比如getfile->puthdfs->executesparkinteractive。