我有一个spark代码,它使用append模式以json格式将Dataframe保存到hdfs位置(日期分区位置)。
df.write.mode("append").format('json').save(hdfsPath)
sample hdfs location : /tmp/table1/datepart=20190903
我正在使用nifi集群中上游的数据。nifi集群中的每个节点将为消耗的数据创建一个流文件。我的spark代码正在处理这个流文件。随着nifi的分发,我的spark代码将从不同的nifi节点并行执行,试图将数据保存到相同的hdfs位置。
我不能将spark作业的输出存储在不同的目录中,因为我的数据是按日期分区的。
从过去14天开始,此进程每天运行一次,我的spark作业因不同的错误失败了4次。第一个错误:
java.io.IOException: Failed to rename FileStatus{path=hdfs://tmp/table1/datepart=20190824/_temporary/0/task_20190824020604_0000_m_000000/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json; isDirectory=false; length=0; replication=3; blocksize=268435456; modification_time=1566630365451; access_time=1566630365034; owner=hive; group=hive; permission=rwxrwx--x; isSymlink=false} to hdfs://tmp/table1/datepart=20190824/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json
第二个错误:
java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190825/_temporary/0 does not exist.
第三个错误:
java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190901/_temporary/0/task_20190901020450_0000_m_000000 does not exist.
第四个错误:
java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190903/_temporary/0 does not exist.
以下是问题:
我无法再重现这种情景。怎么做?
在所有4种情况下,错误都与临时目录有关。是因为两个或多个作业并行地试图将数据保存在同一个hdfs位置,而在执行该作业时,a可能删除了作业b的临时目录(因为位置相同,而且所有文件夹都有公用名/\u目录/0/)
如果是并发问题,那么我可以从主节点运行所有nifi处理器,但这样会降低性能。
需要你的Maven建议。
提前谢谢。
1条答案
按热度按时间qcuzuvrc1#
问题似乎是,两个spark节点独立地尝试写入同一个位置,这会导致冲突,因为最快的一个节点将在第二个节点预期之前清除工作目录。
最直接的解决方案可能是避免这种情况。
我了解如何使用nifi和spark,nifi运行的节点也决定spark运行的节点(存在1-1关系?)
如果是这种情况,您应该能够通过将nifi中的工作路由到互不干扰的节点来解决这个问题。查看依赖于属性的负载平衡策略(队列的属性)。当然,您需要定义正确的属性,但是像目录或表名这样的东西应该很有用。