sparking流无法写入hdfs路径

uemypmqf  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(367)

我使用的是spark-sql-2.4.1v和kafka0.10.x,使用的是java1.8。

Dataset<Row> dataSet= sparkSession
                      .readStream()
                      .format("kafka")
                      .option("subscribe", INFO_TOPIC)
                      .option("startingOffsets", "latest")
                      .option("enable.auto.commit", false)
                      .option("maxOffsetsPerTrigger", 1000)
                      .option("auto.offset.reset", "latest")
                      .option("failOnDataLoss", false)
                      .load();

StreamingQuery query = dataSet.writeStream()
        .format(PARQUET_FORMAT)
        .option("path", parqetFileName)
        .option("checkpointLocation", checkPtLocation)
        .trigger(Trigger.ProcessingTime("15 seconds"))
        .start();

query.awaitTermination();

在将数据写入我的hdfs路径(即parqetfilename)之后,它失败并出现以下错误。

[DataStreamer for file /user/parquet/raw/part-00001-7cba7fa3-a98f-442d-9584-b71085b7cd82-c000.snappy.parquet] WARN  org.apache.hadoop.hdfs.DataStreamer - Caught exception
java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1249)
        at java.lang.Thread.join(Thread.java:1323)
        at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:980)
        at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:630)
        at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:807)

这里有什么问题,怎么解决?

yi0zb3m4

yi0zb3m41#

你一定有 streamContext.awaitTermination() 否则,应用程序将在启动流后立即退出。

相关问题