apachespark,带有自定义接收器的java流挂起

t40tm48m  于 2021-07-12  发布在  Java
关注(0)|答案(0)|浏览(191)

我已经使用java流api编写了一个自定义接收器。它的目的是读取一个加密的文件,解密它,然后推动进一步的阐述行。问题是,即使文件被正确读取、解密并推送到内存存储区,我也无法启动流媒体。使流生成输出的唯一方法是优雅地停止spark流上下文。
这是java自定义接收器的一部分,我将解密的行放入存储:

BufferedReader br = new BufferedReader(new InputStreamReader(zsIn, StandardCharsets.UTF_8));
for (int i = 0; !this.isStopped() && (decompressedLine = br.readLine()) != null; ++i) {
                this.store(decompressedLine);
}

在这里,我访问接收到的流并尝试执行一些操作:

SparkConf sparkConf = new SparkConf().setAppName("MyJavaStreaming");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
JavaReceiverInputDStream<String> customReceiverStream = ssc.receiverStream(new SparkDecodingReceiver(args[0], args[1]));
customReceiverStream.foreachRDD((rdd, time) -> {
    SparkSession singletonSpark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
    JavaRDD<InputFileFlatStructure> rowRDD = rdd.map(line -> {
        InputFileFlatStructure inputRecord = new InputFileFlatStructure();
        if (!line.isEmpty()) {
            inputRecord.setValue(line);
        }
        return inputRecord;
    });

    Dataset<Row> wordsDataFrame = singletonSpark.createDataFrame(rowRDD, InputFileFlatStructure.class);
    wordsDataFrame.coalesce(1).write().format("csv").option("sep", "|").save("/nfs/output");

});

ssc.start();
ssc.awaitTermination();

使用此代码,进程将挂起,并且从不刷新文件系统上接收到的流。除了自定义接收器和spark store的使用之外,代码与此示例非常相似:https://github.com/apache/spark/blob/v3.1.1/examples/src/main/java/org/apache/spark/examples/streaming/javasqlnetworkwordcount.java 我的代码里少了什么吗?如何启动流处理并将数据集写入文件系统?谢谢你的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题